andygrove commented on code in PR #1485:
URL: https://github.com/apache/datafusion-comet/pull/1485#discussion_r1988082049


##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -48,30 +48,30 @@ public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocat
   public static CometShuffleMemoryAllocatorTrait getInstance(
       SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
     boolean isSparkTesting = Utils.isTesting();
-    boolean useUnifiedMemAllocator =
+    boolean useTestAllocator =
         (boolean)
             
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
 
-    if (!useUnifiedMemAllocator) {
+    if (isSparkTesting || useTestAllocator) {
       synchronized (CometShuffleMemoryAllocator.class) {
         if (INSTANCE == null) {
           // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
           INSTANCE = new CometTestShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
         }
       }
       return INSTANCE;
-    } else {
-      if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
-        throw new IllegalArgumentException(
-            "CometShuffleMemoryAllocator should be used with off-heap "
-                + "memory mode, but got "
-                + taskMemoryManager.getTungstenMemoryMode());
-      }
+    }
 
-      // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which 
is not singleton,
-      // but one instance per task. So we need to create a new instance for 
each task.
-      return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+    if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {

Review Comment:
   In Comet 0.3.0 we had a different implementation of 
CometShuffleMemoryAllocator that managed its own memory separately from Spark:
   
   ```
   /**
    * A simple memory allocator used by `CometShuffleExternalSorter` to 
allocate memory blocks which
    * store serialized rows. We don't rely on Spark memory allocator because we 
need to allocate
    * off-heap memory no matter memory mode is on-heap or off-heap. This 
allocator is configured with
    * fixed size of memory, and it will throw `SparkOutOfMemoryError` if the 
memory is not enough.
    *
    * <p>Some methods are copied from 
`org.apache.spark.unsafe.memory.TaskMemoryManager` with
    * modifications. Most modifications are to remove the dependency on the 
configured memory mode.
    */
   public final class CometShuffleMemoryAllocator extends MemoryConsumer {
   ```
   
   In Comet 0.4.0 we forced use of off-heap memory and in Comet 0.5.0 we 
reverted that decision but inadvertently started using the test version of the 
allocator.
   
   Perhaps we must re-instate this earlier version to support the on-heap use 
case. I am not sure, though. I wonder if @viirya can make any recommendation on 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to