mridulm commented on code in PR #47856:
URL: https://github.com/apache/spark/pull/47856#discussion_r2051611504


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -1582,6 +1582,16 @@ package object config {
       .intConf
       .createWithDefault(Integer.MAX_VALUE)
 
+  private[spark] val SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD =
+    ConfigBuilder("spark.shuffle.spill.maxRecordsSizeForSpillThreshold")
+      .internal()
+      .doc("The maximum size in memory before forcing the shuffle sorter to 
spill. " +
+        "By default it is Long.MAX_VALUE, which means we never force the 
sorter to spill, " +
+        "until we reach some limitations, like the max page size limitation 
for the pointer " +
+        "array in the sorter.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefault(Long.MaxValue)

Review Comment:
   Add a size constraint when specified - to ensure invalid values dont get set
   At a minimum `.checkValue(v => v > 0, "The threshold should be positive.")` 
- though perhaps something reasonably large should be enforced ? 1mb ?
   
   Same for other configs also.



##########
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala:
##########
@@ -81,7 +85,11 @@ private[spark] abstract class 
Spillable[C](taskMemoryManager: TaskMemoryManager)
    */
   protected def maybeSpill(collection: C, currentMemory: Long): Boolean = {
     var shouldSpill = false
-    if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
+    // Check number of elements or memory usage limits, whichever is hit first
+    if (_elementsRead > numElementsForceSpillThreshold
+      || currentMemory > maxSizeForceSpillThreshold) {

Review Comment:
   By moving `_elementsRead > numElementsForceSpillThreshold` here, we would 
actually reduce some unnecessary allocations .... nice !



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3148,6 +3148,14 @@ object SQLConf {
       .intConf
       
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
+    buildConf("spark.sql.windowExec.buffer.spill.size.threshold")
+      .internal()
+      .doc("Threshold for size of rows to be spilled by window operator")
+      .version("4.0.0")
+      .bytesConf(ByteUnit.BYTE)
+      
.createWithDefault(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD.defaultValue.get)

Review Comment:
   For this and other configs below - fallback to 
`SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD`  ?
   



##########
core/src/main/scala/org/apache/spark/util/collection/Spillable.scala:
##########
@@ -90,11 +98,10 @@ private[spark] abstract class 
Spillable[C](taskMemoryManager: TaskMemoryManager)
       // or we already had more memory than myMemoryThreshold), spill the 
current collection
       shouldSpill = currentMemory >= myMemoryThreshold
     }
-    shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold
     // Actually spill
     if (shouldSpill) {
       _spillCount += 1
-      logSpillage(currentMemory)
+      logSpillage(currentMemory, elementsRead)

Review Comment:
   nit: Use `_elementsRead` directly here



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##########
@@ -3148,6 +3148,14 @@ object SQLConf {
       .intConf
       
.createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get)
 
+  val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD =
+    buildConf("spark.sql.windowExec.buffer.spill.size.threshold")

Review Comment:
   The config name is a bit confusing.
   `spark.sql.windowExec.buffer.spill.threshold` vs 
`spark.sql.windowExec.buffer.spill.size.threshold`.
   
   Same for the others introduced.
   
   I will let @HyukjinKwon or @cloud-fan comment better though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to