Kontinuation commented on code in PR #1568:
URL: https://github.com/apache/datafusion-comet/pull/1568#discussion_r2010575602


##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet.execution.shuffle
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters.asJavaIterableConverter
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
ShuffleWriteMetricsReporter, ShuffleWriter}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, SinglePartition}
+import org.apache.spark.sql.comet.{CometExec, CometMetricNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.CometConf
+import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, 
QueryPlanSerde}
+import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator}
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+
+/**
+ * A [[ShuffleWriter]] that will delegate shuffle write to native shuffle.
+ */
+class CometNativeShuffleWriter[K, V](
+    outputPartitioning: Partitioning,
+    outputAttributes: Seq[Attribute],
+    metrics: Map[String, SQLMetric],
+    numParts: Int,
+    shuffleId: Int,
+    mapId: Long,
+    context: TaskContext,
+    metricsReporter: ShuffleWriteMetricsReporter)
+    extends ShuffleWriter[K, V] {
+
+  private val OFFSET_LENGTH = 8
+
+  var partitionLengths: Array[Long] = _
+  var mapStatus: MapStatus = _
+
+  override def write(inputs: Iterator[Product2[K, V]]): Unit = {
+    val shuffleBlockResolver =
+      
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
+    val dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId)
+    val indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId)
+    val tempDataFilename = dataFile.getPath.replace(".data", ".data.tmp")
+    val tempIndexFilename = indexFile.getPath.replace(".index", ".index.tmp")
+    val tempDataFilePath = Paths.get(tempDataFilename)
+    val tempIndexFilePath = Paths.get(tempIndexFilename)
+
+    // Call native shuffle write
+    val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename)
+
+    val detailedMetrics = Seq(
+      "elapsed_compute",
+      "encode_time",
+      "repart_time",
+      "mempool_time",
+      "input_batches",
+      "spill_count",
+      "spilled_bytes")
+
+    // Maps native metrics to SQL metrics
+    val nativeSQLMetrics = Map(
+      "output_rows" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
+      "data_size" -> metrics("dataSize"),
+      "write_time" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++
+      metrics.filterKeys(detailedMetrics.contains)
+    val nativeMetrics = CometMetricNode(nativeSQLMetrics)
+
+    // Getting rid of the fake partitionId
+    val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, 
Any]]].map(_._2)
+
+    val cometIter = CometExec.getCometIterator(
+      Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]),
+      outputAttributes.length,
+      nativePlan,
+      nativeMetrics,
+      numParts,
+      context.partitionId())
+
+    while (cometIter.hasNext) {
+      cometIter.next()
+    }
+    cometIter.close()
+
+    // get partition lengths from shuffle write output index file
+    var offset = 0L
+    partitionLengths = Files
+      .readAllBytes(tempIndexFilePath)
+      .grouped(OFFSET_LENGTH)
+      .drop(1) // first partition offset is always 0
+      .map(indexBytes => {
+        val partitionOffset =
+          ByteBuffer.wrap(indexBytes).order(ByteOrder.LITTLE_ENDIAN).getLong
+        val partitionLength = partitionOffset - offset
+        offset = partitionOffset
+        partitionLength
+      })
+      .toArray
+    Files.delete(tempIndexFilePath)

Review Comment:
   Delete the temp index file, it won't be used after reading all partition 
lengths from it.



##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet.execution.shuffle
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters.asJavaIterableConverter
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
ShuffleWriteMetricsReporter, ShuffleWriter}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, SinglePartition}
+import org.apache.spark.sql.comet.{CometExec, CometMetricNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.CometConf
+import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, 
QueryPlanSerde}
+import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator}
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+
+/**
+ * A [[ShuffleWriter]] that will delegate shuffle write to native shuffle.
+ */
+class CometNativeShuffleWriter[K, V](
+    outputPartitioning: Partitioning,
+    outputAttributes: Seq[Attribute],
+    metrics: Map[String, SQLMetric],
+    numParts: Int,
+    shuffleId: Int,
+    mapId: Long,
+    context: TaskContext,
+    metricsReporter: ShuffleWriteMetricsReporter)
+    extends ShuffleWriter[K, V] {

Review Comment:
   This class is mostly copy-pasted from CometShuffleExchangeExec.scala, with a 
few additional changes noted in review comments.



##########
spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala:
##########
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.comet.execution.shuffle
+
+import java.nio.{ByteBuffer, ByteOrder}
+import java.nio.file.{Files, Paths}
+
+import scala.collection.JavaConverters.asJavaIterableConverter
+
+import org.apache.spark.{SparkEnv, TaskContext}
+import org.apache.spark.scheduler.MapStatus
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, 
ShuffleWriteMetricsReporter, ShuffleWriter}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, SinglePartition}
+import org.apache.spark.sql.comet.{CometExec, CometMetricNode}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.comet.CometConf
+import org.apache.comet.serde.{OperatorOuterClass, PartitioningOuterClass, 
QueryPlanSerde}
+import org.apache.comet.serde.OperatorOuterClass.{CompressionCodec, Operator}
+import org.apache.comet.serde.QueryPlanSerde.serializeDataType
+
+/**
+ * A [[ShuffleWriter]] that will delegate shuffle write to native shuffle.
+ */
+class CometNativeShuffleWriter[K, V](
+    outputPartitioning: Partitioning,
+    outputAttributes: Seq[Attribute],
+    metrics: Map[String, SQLMetric],
+    numParts: Int,
+    shuffleId: Int,
+    mapId: Long,
+    context: TaskContext,
+    metricsReporter: ShuffleWriteMetricsReporter)
+    extends ShuffleWriter[K, V] {
+
+  private val OFFSET_LENGTH = 8
+
+  var partitionLengths: Array[Long] = _
+  var mapStatus: MapStatus = _
+
+  override def write(inputs: Iterator[Product2[K, V]]): Unit = {
+    val shuffleBlockResolver =
+      
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
+    val dataFile = shuffleBlockResolver.getDataFile(shuffleId, mapId)
+    val indexFile = shuffleBlockResolver.getIndexFile(shuffleId, mapId)
+    val tempDataFilename = dataFile.getPath.replace(".data", ".data.tmp")
+    val tempIndexFilename = indexFile.getPath.replace(".index", ".index.tmp")
+    val tempDataFilePath = Paths.get(tempDataFilename)
+    val tempIndexFilePath = Paths.get(tempIndexFilename)
+
+    // Call native shuffle write
+    val nativePlan = getNativePlan(tempDataFilename, tempIndexFilename)
+
+    val detailedMetrics = Seq(
+      "elapsed_compute",
+      "encode_time",
+      "repart_time",
+      "mempool_time",
+      "input_batches",
+      "spill_count",
+      "spilled_bytes")
+
+    // Maps native metrics to SQL metrics
+    val nativeSQLMetrics = Map(
+      "output_rows" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_RECORDS_WRITTEN),
+      "data_size" -> metrics("dataSize"),
+      "write_time" -> 
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)) ++
+      metrics.filterKeys(detailedMetrics.contains)
+    val nativeMetrics = CometMetricNode(nativeSQLMetrics)
+
+    // Getting rid of the fake partitionId
+    val newInputs = inputs.asInstanceOf[Iterator[_ <: Product2[Any, 
Any]]].map(_._2)
+
+    val cometIter = CometExec.getCometIterator(
+      Seq(newInputs.asInstanceOf[Iterator[ColumnarBatch]]),
+      outputAttributes.length,
+      nativePlan,
+      nativeMetrics,
+      numParts,
+      context.partitionId())
+
+    while (cometIter.hasNext) {
+      cometIter.next()
+    }
+    cometIter.close()

Review Comment:
   Added a `cometIter.close()` call.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to