Kontinuation commented on PR #1004:
URL: 
https://github.com/apache/datafusion-comet/pull/1004#issuecomment-2401198781

   This is better than using a greedy memory pool. It makes spillable operators 
work correctly under memory pressure, especially when running sort-merge-join 
where multiple sort operators compete for resources.
   
   There are still some issues remaining unresolved. Each task may create 
multiple native plans and we still do not make them share the same memory pool. 
I'd like to share the experiments I've done to better sync with you on this 
topic.
   
   I did some experiments [on my 
branch](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406)
 to try out various ways of using memory pools. There's a configuration 
`spark.comet.exec.memoryPool` to allow me running queries using various memory 
pools. All configurations were tested using the query mentioned in 
https://github.com/apache/datafusion-comet/issues/1003.
   
   [**spark.comet.exec.memoryPool = 
greedy**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R282-R284)
   
   This is the current mode when using native memory management. It could only 
run with `spark.comet.memoryOverhead = 8000m`, otherwise sort-merge-join will 
fail because of memory reservation failure:
   
   ```
   24/10/09 10:59:14 WARN TaskSetManager: Lost task 3.0 in stage 13.0 (TID 43) 
(bogon executor driver): org.apache.comet.CometNativeException: Additional 
allocation failed with top memory consumers (across reservations) as: 
ExternalSorterMerge[0] consumed 1164398840 bytes, GroupedHashAggregateStream[0] 
consumed 117699433 bytes, SMJStream[0] consumed 459160 bytes, HashJoinInput[0] 
consumed 1392 bytes. Error: Failed to allocate additional 993312 bytes for 
ExternalSorter[0] with 1321280 bytes already allocated for this reservation - 
625495 bytes remain available for the total pool
   ```
   
   [**spark.comet.exec.memoryPool = 
fair_spill**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R277-R281)
   
   The same approach as this PR. Simply use FairSpillPool for per-plan memory 
pool. It could run with `spark.comet.memoryOverhead = 3200m`. Both sort 
operators could spill to cope with the memory bound:
   
   ```
   24/10/09 11:03:11 INFO core/src/execution/jni_api.rs: Comet native query 
plan with metrics (stage: 13 task: 41):
   AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, 
col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 
as col_6], aggr=[sum], metrics=[output_rows=2791473, 
elapsed_compute=3.695235425s]
     ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as 
col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, 
col_8@8 as col_7, col_1@10 as col_8], metrics=[output_rows=15002382, 
elapsed_compute=2.23923ms]
       ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as 
col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, 
col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1], 
metrics=[output_rows=15002382, elapsed_compute=2.445133ms]
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, 
col_3@3)], metrics=[output_rows=15002382, input_batches=1832, 
input_rows=15002382, build_input_batches=1, output_batches=1832, 
build_input_rows=25, build_mem_used=1392, join_time=853.609662ms, 
build_time=42.125µs]
           CopyExec [UnpackOrDeepCopy], metrics=[output_rows=25, 
elapsed_compute=3.292µs]
             ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: 
Int64, col_1: Utf8], metrics=[output_rows=25, elapsed_compute=709ns, 
cast_time=1ns]
           CopyExec [UnpackOrClone], metrics=[output_rows=15002382, 
elapsed_compute=1.754617ms]
             ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 
as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as 
col_6, col_1@9 as col_7, col_2@10 as col_8], metrics=[output_rows=15002382, 
elapsed_compute=2.032586ms]
               SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)], 
metrics=[output_rows=15002382, spill_count=0, spilled_bytes=0, spilled_rows=0, 
input_batches=2290, input_rows=18752902, output_batches=1832, 
peak_mem_used=918320, join_time=4.976020762s]
                 SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false], 
metrics=[output_rows=3750520, elapsed_compute=1.678235876s, spill_count=3, 
spilled_bytes=572203168, spilled_rows=3066232]
                   CopyExec [UnpackOrDeepCopy], metrics=[output_rows=3750520, 
elapsed_compute=65.265593ms]
                     ScanExec: source=[Exchange (unknown)], schema=[col_0: 
Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: 
Decimal128(12, 2), col_6: Utf8, col_7: Int64], metrics=[output_rows=3750520, 
elapsed_compute=450.456µs, cast_time=1ns]
                 SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false], 
metrics=[output_rows=15002382, elapsed_compute=2.424249133s, spill_count=4, 
spilled_bytes=547164360, spilled_rows=13667085]
                   CopyExec [UnpackOrDeepCopy], metrics=[output_rows=15002382, 
elapsed_compute=40.672672ms]
                     ScanExec: source=[Exchange (unknown)], schema=[col_0: 
Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)], 
metrics=[output_rows=15002382, elapsed_compute=531.627µs, cast_time=1ns]
   ```
   
   But please note that each task may create 2 native plans, which has its own 
memory pool. Here is an example task creating 2 native plans, and these 2 plans 
are running concurrently.
   
   Plan 1:
   
   ```
   24/10/08 13:34:46 INFO core/src/execution/jni_api.rs: Comet native query 
plan (stage: 13 task: 40):
    AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1, 
col_4@4 as col_2, col_3@3 as col_3, col_8@8 as col_4, col_2@2 as col_5, col_5@5 
as col_6], aggr=[sum]
     ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 as 
col_2, col_4@4 as col_3, col_5@5 as col_4, col_6@6 as col_5, col_7@7 as col_6, 
col_8@8 as col_7, col_1@10 as col_8]
       ProjectionExec: expr=[col_0@2 as col_0, col_1@3 as col_1, col_2@4 as 
col_2, col_3@5 as col_3, col_4@6 as col_4, col_5@7 as col_5, col_6@8 as col_6, 
col_7@9 as col_7, col_8@10 as col_8, col_0@0 as col_0, col_1@1 as col_1]
         HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, 
col_3@3)]
           CopyExec [UnpackOrDeepCopy]
             ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: 
Int64, col_1: Utf8]
           CopyExec [UnpackOrClone]
             ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1, col_2@2 
as col_2, col_3@3 as col_3, col_4@4 as col_4, col_5@5 as col_5, col_6@6 as 
col_6, col_1@9 as col_7, col_2@10 as col_8]
               SortMergeJoin: join_type=Inner, on=[(col_7@7, col_0@0)]
                 SortExec: expr=[col_7@7 ASC], preserve_partitioning=[false]
                   CopyExec [UnpackOrDeepCopy]
                     ScanExec: source=[Exchange (unknown)], schema=[col_0: 
Int64, col_1: Utf8, col_2: Utf8, col_3: Int64, col_4: Utf8, col_5: 
Decimal128(12, 2), col_6: Utf8, col_7: Int64]
                 SortExec: expr=[col_0@0 ASC], preserve_partitioning=[false]
                   CopyExec [UnpackOrDeepCopy]
                     ScanExec: source=[Exchange (unknown)], schema=[col_0: 
Int64, col_1: Decimal128(12, 2), col_2: Decimal128(12, 2)]
   ```
   
   Plan 2:
   
   ```
   24/10/08 13:34:52 INFO core/src/execution/jni_api.rs: Comet native query 
plan (stage: 13 task: 40):
    ShuffleWriterExec: partitioning=Hash([Column { name: "col_0", index: 0 }, 
Column { name: "col_1", index: 1 }, Column { name: "col_2", index: 2 }, Column 
{ name: "col_3", index: 3 }, Column { name: "col_4", index: 4 }, Column { name: 
"col_5", index: 5 }, Column { name: "col_6", index: 6 }], 4)
     ScanExec: source=[], schema=[col_0: Int64, col_1: Utf8, col_2: 
Decimal128(12, 2), col_3: Utf8, col_4: Utf8, col_5: Utf8, col_6: Utf8, col_7: 
Decimal128(36, 4), col_8: Boolean]
   ```
   
   [**spark.comet.exec.memoryPool = 
fair_spill_shared**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R235-R242)
   
   This approach allocates a FairSpillPool for all plans in the same task. For 
the above example, the sort-merge-join and the shuffle-write plans in the same 
task share the same memory pool. This strictly follows the conceptual model 
that comet won't exceed the `spark.comet.memoryOverhead`. It could run with 
`spark.comet.memoryOverhead = 4800m`.
   
   I've added 2 additional JNI interfaces for creating a memory pool at the 
beginning of each task and releasing the memory pool at the end of each task. 
Actually this is not necessary. We can create and track the usage of per-task 
memory pool in the native code, all it needs is the task attempt id at native 
plan creation time.
   
   [**spark.comet.exec.memoryPool = 
fair_spill_global**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R259-R267)
   
   This approach uses a singleton FairSpillPool for all tasks in the same 
executor instance. I thought that it should be the optimal approach, but in 
practice it does not work well. It could only run with 
`spark.comet.memoryOverhead = 12000m`. I'll dive into this issue next week 
since there are lots of other work allocated for this week :(.
   
   [**spark.comet.exec.memoryPool = 
greedy_global**](https://github.com/Kontinuation/datafusion-comet/compare/fix-jvm-shuffle-allocator...Kontinuation:datafusion-comet:switch-to-fair-spill-pool#diff-6752f38b5d0b58f5cf8d7684fe4c562c00b9d11e2640e34419257fb31a975406R268-R276)
   
   This approach uses a singleton GreedyMemoryPool for all tasks in the same 
executor instance. As expected, it does not work well. It could only run with 
`spark.comet.memoryOverhead = 9000m`.
   
   
   So the conclusion is that `fair_spill` and `fair_spill_shared` have lower 
memory requirements and are less likely to break when running memory-intensive 
queries, and I also believe that Spark needs a more sophisticated memory 
management system from datafusion to support large ETL use cases steadily, 
which is the use case where Spark shines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to