andygrove commented on code in PR #1485:
URL: https://github.com/apache/datafusion-comet/pull/1485#discussion_r1993726889


##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -19,91 +19,38 @@
 
 package org.apache.spark.shuffle.comet;
 
-import java.io.IOException;
-
 import org.apache.spark.SparkConf;
-import org.apache.spark.memory.MemoryConsumer;
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.memory.TaskMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryBlock;
-import org.apache.spark.util.Utils;
 
-import org.apache.comet.CometConf$;
+import org.apache.comet.CometSparkSessionExtensions;
 
 /**
- * A simple memory allocator used by `CometShuffleExternalSorter` to allocate 
memory blocks which
- * store serialized rows. This class is simply an implementation of 
`MemoryConsumer` that delegates
- * memory allocation to the `TaskMemoryManager`. This requires that the 
`TaskMemoryManager` is
- * configured with `MemoryMode.OFF_HEAP`, i.e. it is using off-heap memory.
+ * An interface to instantiate either CometBoundedShuffleMemoryAllocator 
(on-heap mode) or
+ * CometUnifiedShuffleMemoryAllocator (off-heap mode).
  */
-public final class CometShuffleMemoryAllocator extends 
CometShuffleMemoryAllocatorTrait {
+public final class CometShuffleMemoryAllocator {
   private static CometShuffleMemoryAllocatorTrait INSTANCE;
 
   /**
    * Returns the singleton instance of `CometShuffleMemoryAllocator`. This 
method should be used
    * instead of the constructor to ensure that only one instance of 
`CometShuffleMemoryAllocator` is
-   * created. For Spark tests, this returns `CometTestShuffleMemoryAllocator` 
which is a test-only
-   * allocator that should not be used in production.
+   * created. For on-heap mode (Spark tests), this returns 
`CometBoundedShuffleMemoryAllocator`.
    */
   public static CometShuffleMemoryAllocatorTrait getInstance(
       SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
-    boolean isSparkTesting = Utils.isTesting();
-    boolean useUnifiedMemAllocator =
-        (boolean)
-            
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
-
-    if (!useUnifiedMemAllocator) {
-      synchronized (CometShuffleMemoryAllocator.class) {
-        if (INSTANCE == null) {
-          // CometTestShuffleMemoryAllocator handles pages by itself so it can 
be a singleton.
-          INSTANCE = new CometTestShuffleMemoryAllocator(conf, 
taskMemoryManager, pageSize);
-        }
-      }
-      return INSTANCE;
-    } else {
-      if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
-        throw new IllegalArgumentException(
-            "CometShuffleMemoryAllocator should be used with off-heap "
-                + "memory mode, but got "
-                + taskMemoryManager.getTungstenMemoryMode());
-      }
 
+    if (CometSparkSessionExtensions.cometUnifiedMemoryManagerEnabled(conf)) {

Review Comment:
   We can directly check the memory mode in the provided task manager rather 
than checking the configs here
   
   ```suggestion
       if (taskMemoryManager.getTungstenMemoryMode() == MemoryMode.OFF_HEAP) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to