kazuyukitanimura opened a new issue, #1268:
URL: https://github.com/apache/datafusion-comet/issues/1268

   ### Describe the bug
   
   The attached test is taken from `WriteDistributionAndOrderingSuite` Spark 
test `ordered distribution and sort with same exprs: append`
   
   Looks like Comet shuffle read size is reported much larger than Spark 
shuffle that causes more partitions
   
   ### Steps to reproduce
   
   package org.apache.spark.sql
   
   import java.sql.Date
   import java.util.Collections
   
   import org.scalactic.source.Position
   import org.scalatest.Tag
   
   import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
   import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
   import org.apache.spark.sql.connector.distributions.Distributions
   import org.apache.spark.sql.connector.expressions._
   import org.apache.spark.sql.connector.expressions.LogicalExpressions.sort
   import org.apache.spark.sql.execution._
   import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
   import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
   import org.apache.spark.sql.internal.SQLConf
   import org.apache.spark.sql.types.{DateType, IntegerType, StringType, 
StructType}
   import org.apache.spark.sql.util.QueryExecutionListener
   
   import org.apache.comet.CometConf
   
   class CSuite extends CometTestBase {
     import testImplicits._
   
     override protected def test(testName: String, testTags: Tag*)(testFun: => 
Any)(implicit
         pos: Position): Unit = {
       super.test(testName, testTags: _*) {
         withSQLConf(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
           testFun
         }
       }
     }
   
     test("a") {
   
       def catalog: InMemoryCatalog = {
         spark.conf.set("spark.sql.catalog.testcat", 
classOf[InMemoryCatalog].getName)
         val catalog = spark.sessionState.catalogManager.catalog("testcat")
         catalog.asTableCatalog.asInstanceOf[InMemoryCatalog]
       }
       val namespace = Array("ns1")
       val ident = Identifier.of(namespace, "test_table")
       val tableNameAsString = "testcat." + ident.toString
       val emptyProps = Collections.emptyMap[String, String]
       val schema = new StructType()
         .add("id", IntegerType)
         .add("data", StringType)
         .add("day", DateType)
       val tableOrdering = Array[SortOrder](
         sort(FieldReference("data"), SortDirection.ASCENDING, 
NullOrdering.NULLS_FIRST))
       val tableDistribution = Distributions.ordered(tableOrdering)
       val writeTransform: DataFrame => DataFrame = df => df
   
       catalog.createTable(
         ident = ident,
         schema = schema,
         partitions = Array.empty,
         properties = emptyProps,
         distribution = tableDistribution,
         ordering = tableOrdering,
         requiredNumPartitions = None,
         advisoryPartitionSize = Some(1000),
         distributionStrictlyRequired = true)
   
       val df =
         spark.sparkContext
           .parallelize(
             (1 to 10).map { i =>
               (if (i > 4) 5 else i, i.toString, Date.valueOf(s"${2020 + 
i}-$i-$i"))
             },
             3)
           .toDF("id", "data", "day")
       val writer = writeTransform(df).writeTo(tableNameAsString)
   
       def execute(writeFunc: => Unit): SparkPlan = {
         var executedPlan: SparkPlan = null
   
         val listener = new QueryExecutionListener {
           override def onSuccess(funcName: String, qe: QueryExecution, 
durationNs: Long): Unit = {
             executedPlan = qe.executedPlan
           }
           override def onFailure(
               funcName: String,
               qe: QueryExecution,
               exception: Exception): Unit = {}
         }
         spark.listenerManager.register(listener)
   
         writeFunc
   
         sparkContext.listenerBus.waitUntilEmpty()
   
         executedPlan match {
           case w: V2TableWriteExec =>
             stripAQEPlan(w.query)
           case _ =>
             fail("expected V2TableWriteExec")
         }
       }
   
       def executeCommand(): SparkPlan = execute(writer.append())
   
       // if the partition size is configured for the table, set the SQL conf 
to something small
       // so that the overriding behavior is tested
       val defaultAdvisoryPartitionSize = "15"
       withSQLConf(
         SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
         SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
         SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
         SQLConf.SHUFFLE_PARTITIONS.key -> "5",
         SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> 
defaultAdvisoryPartitionSize,
         SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1") {
   
         val executedPlan = executeCommand()
         val read = collect(executedPlan) { case r: AQEShuffleReadExec =>
           r
         }
         assert(read.size == 1)
         println(read.head.partitionSpecs)
         assert(read.head.partitionSpecs.size == 1)
       }
     }
   }
   
   
   ### Expected behavior
   
   Spark shuffle partition specs
   `ArrayBuffer(CoalescedPartitionSpec(0,5,Some(394)))`
   
   Comet shuffle partion specs
   `ArrayBuffer(CoalescedPartitionSpec(0,1,Some(890)), 
CoalescedPartitionSpec(1,3,Some(890)), CoalescedPartitionSpec(3,4,Some(890)), 
CoalescedPartitionSpec(4,5,Some(445)))`
   
   ### Additional context
   
   May need Spark 3.5+ for the above test or backport 
https://issues.apache.org/jira/browse/SPARK-42779


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to