andygrove commented on code in PR #1485:
URL: https://github.com/apache/datafusion-comet/pull/1485#discussion_r1988082049
##########
spark/src/main/java/org/apache/spark/shuffle/comet/CometShuffleMemoryAllocator.java:
##########
@@ -48,30 +48,30 @@ public final class CometShuffleMemoryAllocator extends
CometShuffleMemoryAllocat
public static CometShuffleMemoryAllocatorTrait getInstance(
SparkConf conf, TaskMemoryManager taskMemoryManager, long pageSize) {
boolean isSparkTesting = Utils.isTesting();
- boolean useUnifiedMemAllocator =
+ boolean useTestAllocator =
(boolean)
CometConf$.MODULE$.COMET_COLUMNAR_SHUFFLE_UNIFIED_MEMORY_ALLOCATOR_IN_TEST().get();
- if (!useUnifiedMemAllocator) {
+ if (isSparkTesting || useTestAllocator) {
synchronized (CometShuffleMemoryAllocator.class) {
if (INSTANCE == null) {
// CometTestShuffleMemoryAllocator handles pages by itself so it can
be a singleton.
INSTANCE = new CometTestShuffleMemoryAllocator(conf,
taskMemoryManager, pageSize);
}
}
return INSTANCE;
- } else {
- if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
- throw new IllegalArgumentException(
- "CometShuffleMemoryAllocator should be used with off-heap "
- + "memory mode, but got "
- + taskMemoryManager.getTungstenMemoryMode());
- }
+ }
- // CometShuffleMemoryAllocator stores pages in TaskMemoryManager which
is not singleton,
- // but one instance per task. So we need to create a new instance for
each task.
- return new CometShuffleMemoryAllocator(taskMemoryManager, pageSize);
+ if (taskMemoryManager.getTungstenMemoryMode() != MemoryMode.OFF_HEAP) {
Review Comment:
In Comet 0.3.0 we had a different implementation of
CometShuffleMemoryAllocator that managed its own memory separately from Spark:
```
/**
* A simple memory allocator used by `CometShuffleExternalSorter` to
allocate memory blocks which
* store serialized rows. We don't rely on Spark memory allocator because we
need to allocate
* off-heap memory no matter memory mode is on-heap or off-heap. This
allocator is configured with
* fixed size of memory, and it will throw `SparkOutOfMemoryError` if the
memory is not enough.
*
* <p>Some methods are copied from
`org.apache.spark.unsafe.memory.TaskMemoryManager` with
* modifications. Most modifications are to remove the dependency on the
configured memory mode.
*/
public final class CometShuffleMemoryAllocator extends MemoryConsumer {
```
In Comet 0.4.0 we forced use of off-heap memory and in Comet 0.5.0 we
reverted that decision but inadvertently started using the test version of the
allocator.
Perhaps we must re-instate this earlier version to support the on-heap use
case. I am not sure, though. I wonder if @viirya can make any recommendation on
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]