andygrove commented on code in PR #1485:
URL: https://github.com/apache/datafusion-comet/pull/1485#discussion_r1988060641


##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -48,30 +48,30 @@ public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocat
   public static CometShuffleMemoryAllocatorTrait getInstance(
       SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
     boolean isSparkTesting = Utils.isTesting();
-    boolean useUnifiedMemAllocator =
+    boolean useTestAllocator =
         (boolean)
             
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
 
-    if (!useUnifiedMemAllocator) {
+    if (isSparkTesting || useTestAllocator) {
       synchronized (CometShuffleMemoryAllocator.class) {
         if (INSTANCE == null) {
           // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
           INSTANCE = new CometTestShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
         }
       }
       return INSTANCE;
-    } else {
-      if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
-        throw new IllegalArgumentException(
-            "CometShuffleMemoryAllocator should be used with off-heap "
-                + "memory mode, but got "
-                + taskMemoryManager.getTungstenMemoryMode());
-      }
+    }
 
-      // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which 
is not singleton,
-      // but one instance per task. So we need to create a new instance for 
each task.
-      return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+    if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {

Review Comment:
   I tested with TPC-H without using off-heap and ran into this:
   
   ```
   Caused by: java.lang.IllegalArgumentException: CometShuffleMemoryAllocator 
should be used with off-heap memory mode, but got ON_HEAP
        at 
org.apache.spark.shuffle.comet.CometShuffleMemoryAllocator.getInstance(CometShuffleMemoryAllocator.java:74)
        at 
org.apache.spark.sql.comet.execution.shuffle.CometDiskBlockWriter.<init>(CometDiskBlockWriter.java:142)
        at 
org.apache.spark.sql.comet.execution.shuffle.CometBypassMergeSortShuffleWriter.write(CometBypassMergeSortShuffleWriter.java:181)
        at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to