kazuyukitanimura opened a new issue, #1268: URL: https://github.com/apache/datafusion-comet/issues/1268
### Describe the bug The attached test is taken from `WriteDistributionAndOrderingSuite` Spark test `ordered distribution and sort with same exprs: append` Looks like Comet shuffle read size is reported much larger than Spark shuffle that causes more partitions ### Steps to reproduce package org.apache.spark.sql import java.sql.Date import java.util.Collections import org.scalactic.source.Position import org.scalatest.Tag import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.connector.distributions.Distributions import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DateType, IntegerType, StringType, StructType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.comet.CometConf class CSuite extends CometTestBase { import testImplicits._ override protected def test(testName: String, testTags: Tag*)(testFun: => Any)(implicit pos: Position): Unit = { super.test(testName, testTags: _*) { withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") { testFun } } } test("a") { def catalog: InMemoryCatalog = { spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName) val catalog = spark.sessionState.catalogManager.catalog("testcat") catalog.asTableCatalog.asInstanceOf[InMemoryCatalog] } val namespace = Array("ns1") val ident = Identifier.of(namespace, "test_table") val tableNameAsString = "testcat." + ident.toString val emptyProps = Collections.emptyMap[String, String] val schema = new StructType() .add("id", IntegerType) .add("data", StringType) .add("day", DateType) val tableOrdering = Array[SortOrder]( sort(FieldReference("data"), SortDirection.ASCENDING, NullOrdering.NULLS_FIRST)) val tableDistribution = Distributions.ordered(tableOrdering) val writeTransform: DataFrame => DataFrame = df => df catalog.createTable( ident = ident, schema = schema, partitions = Array.empty, properties = emptyProps, distribution = tableDistribution, ordering = tableOrdering, requiredNumPartitions = None, advisoryPartitionSize = Some(1000), distributionStrictlyRequired = true) val df = spark.sparkContext .parallelize( (1 to 10).map { i => (if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 + i}-$i-$i")) }, 3) .toDF("id", "data", "day") val writer = writeTransform(df).writeTo(tableNameAsString) def execute(writeFunc: => Unit): SparkPlan = { var executedPlan: SparkPlan = null val listener = new QueryExecutionListener { override def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit = { executedPlan = qe.executedPlan } override def onFailure( funcName: String, qe: QueryExecution, exception: Exception): Unit = {} } spark.listenerManager.register(listener) writeFunc sparkContext.listenerBus.waitUntilEmpty() executedPlan match { case w: V2TableWriteExec => stripAQEPlan(w.query) case _ => fail("expected V2TableWriteExec") } } def executeCommand(): SparkPlan = execute(writer.append()) // if the partition size is configured for the table, set the SQL conf to something small // so that the overriding behavior is tested val defaultAdvisoryPartitionSize = "15" withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.SHUFFLE_PARTITIONS.key -> "5", SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> defaultAdvisoryPartitionSize, SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") { val executedPlan = executeCommand() val read = collect(executedPlan) { case r: AQEShuffleReadExec => r } assert(read.size == 1) println(read.head.partitionSpecs) assert(read.head.partitionSpecs.size == 1) } } } ### Expected behavior Spark shuffle partition specs `ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))` Comet shuffle partion specs `ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)), CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)), CoalescedPartitionSpec(4,5,Some(445)))` ### Additional context May need Spark 3.5+ for the above test or backport https://issues.apache.org/jira/browse/SPARK-42779 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org