mridulm commented on code in PR #47856: URL: https://github.com/apache/spark/pull/47856#discussion_r2051611504
########## core/src/main/scala/org/apache/spark/internal/config/package.scala: ########## @@ -1582,6 +1582,16 @@ package object config { .intConf .createWithDefault(Integer.MAX_VALUE) + private[spark] val SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD = + ConfigBuilder("spark.shuffle.spill.maxRecordsSizeForSpillThreshold") + .internal() + .doc("The maximum size in memory before forcing the shuffle sorter to spill. " + + "By default it is Long.MAX_VALUE, which means we never force the sorter to spill, " + + "until we reach some limitations, like the max page size limitation for the pointer " + + "array in the sorter.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(Long.MaxValue) Review Comment: Add a size constraint when specified - to ensure invalid values dont get set At a minimum `.checkValue(v => v > 0, "The threshold should be positive.")` - though perhaps something reasonably large should be enforced ? 1mb ? Same for other configs also. ########## core/src/main/scala/org/apache/spark/util/collection/Spillable.scala: ########## @@ -81,7 +85,11 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) */ protected def maybeSpill(collection: C, currentMemory: Long): Boolean = { var shouldSpill = false - if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { + // Check number of elements or memory usage limits, whichever is hit first + if (_elementsRead > numElementsForceSpillThreshold + || currentMemory > maxSizeForceSpillThreshold) { Review Comment: By moving `_elementsRead > numElementsForceSpillThreshold` here, we would actually reduce some unnecessary allocations .... nice ! ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -3148,6 +3148,14 @@ object SQLConf { .intConf .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get) + val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD = + buildConf("spark.sql.windowExec.buffer.spill.size.threshold") + .internal() + .doc("Threshold for size of rows to be spilled by window operator") + .version("4.0.0") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD.defaultValue.get) Review Comment: For this and other configs below - fallback to `SHUFFLE_SPILL_MAX_SIZE_FORCE_SPILL_THRESHOLD` ? ########## core/src/main/scala/org/apache/spark/util/collection/Spillable.scala: ########## @@ -90,11 +98,10 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) // or we already had more memory than myMemoryThreshold), spill the current collection shouldSpill = currentMemory >= myMemoryThreshold } - shouldSpill = shouldSpill || _elementsRead > numElementsForceSpillThreshold // Actually spill if (shouldSpill) { _spillCount += 1 - logSpillage(currentMemory) + logSpillage(currentMemory, elementsRead) Review Comment: nit: Use `_elementsRead` directly here ########## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ########## @@ -3148,6 +3148,14 @@ object SQLConf { .intConf .createWithDefault(SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get) + val WINDOW_EXEC_BUFFER_SIZE_SPILL_THRESHOLD = + buildConf("spark.sql.windowExec.buffer.spill.size.threshold") Review Comment: The config name is a bit confusing. `spark.sql.windowExec.buffer.spill.threshold` vs `spark.sql.windowExec.buffer.spill.size.threshold`. Same for the others introduced. I will let @HyukjinKwon or @cloud-fan comment better though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org