viirya commented on code in PR #1094:
URL: https://github.com/apache/datafusion-comet/pull/1094#discussion_r1844743112


##########
spark/src/main/scala/org/apache/spark/sql/comet/CometNativeScanExec.scala:
##########
@@ -60,415 +50,34 @@ case class CometNativeScanExec(
     dataFilters: Seq[Expression],
     tableIdentifier: Option[TableIdentifier],
     disableBucketedScan: Boolean = false,
-    originalPlan: FileSourceScanExec)
-    extends CometPlan
-    with DataSourceScanExec
-    with ShimCometScanExec {
-
-  def wrapped: FileSourceScanExec = originalPlan
-
-  // FIXME: ideally we should reuse wrapped.supportsColumnar, however that 
fails many tests
-  override lazy val supportsColumnar: Boolean =
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
-
-  override def vectorTypes: Option[Seq[String]] = originalPlan.vectorTypes
-
-  private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
-
-  /**
-   * Send the driver-side metrics. Before calling this function, 
selectedPartitions has been
-   * initialized. See SPARK-26327 for more details.
-   */
-  private def sendDriverMetrics(): Unit = {
-    driverMetrics.foreach(e => metrics(e._1).add(e._2))
-    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(
-      sparkContext,
-      executionId,
-      metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
-  }
-
-  private def isDynamicPruningFilter(e: Expression): Boolean =
-    e.find(_.isInstanceOf[PlanExpression[_]]).isDefined
-
-  @transient lazy val selectedPartitions: Array[PartitionDirectory] = {
-    val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
-    val startTime = System.nanoTime()
-    val ret =
-      
relation.location.listFiles(partitionFilters.filterNot(isDynamicPruningFilter), 
dataFilters)
-    setFilesNumAndSizeMetric(ret, true)
-    val timeTakenMs =
-      NANOSECONDS.toMillis((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs)
-    driverMetrics("metadataTime") = timeTakenMs
-    ret
-  }.toArray
-
-  // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
-  // present. This is because such a filter relies on information that is only 
available at run
-  // time (for instance the keys used in the other side of a join).
-  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
-    val dynamicPartitionFilters = 
partitionFilters.filter(isDynamicPruningFilter)
-
-    if (dynamicPartitionFilters.nonEmpty) {
-      val startTime = System.nanoTime()
-      // call the file index for the files matching all filters except dynamic 
partition filters
-      val predicate = dynamicPartitionFilters.reduce(And)
-      val partitionColumns = relation.partitionSchema
-      val boundPredicate = Predicate.create(
-        predicate.transform { case a: AttributeReference =>
-          val index = partitionColumns.indexWhere(a.name == _.name)
-          BoundReference(index, partitionColumns(index).dataType, nullable = 
true)
-        },
-        Nil)
-      val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
-      setFilesNumAndSizeMetric(ret, false)
-      val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
-      driverMetrics("pruningTime") = timeTakenMs
-      ret
-    } else {
-      selectedPartitions
-    }
-  }
-
-  // exposed for testing
-  lazy val bucketedScan: Boolean = originalPlan.bucketedScan
-
-  override lazy val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) =
-    (originalPlan.outputPartitioning, originalPlan.outputOrdering)
-
-  @transient
-  private lazy val pushedDownFilters = getPushedDownFilters(relation, 
dataFilters)
-
-  override lazy val metadata: Map[String, String] =
-    if (originalPlan == null) Map.empty else originalPlan.metadata
-
-  override def verboseStringWithOperatorId(): String = {
-    val metadataStr = metadata.toSeq.sorted
-      .filterNot {
-        case (_, value) if (value.isEmpty || value.equals("[]")) => true
-        case (key, _) if (key.equals("DataFilters") || key.equals("Format")) 
=> true
-        case (_, _) => false
-      }
-      .map {
-        case (key, _) if (key.equals("Location")) =>
-          val location = relation.location
-          val numPaths = location.rootPaths.length
-          val abbreviatedLocation = if (numPaths <= 1) {
-            location.rootPaths.mkString("[", ", ", "]")
-          } else {
-            "[" + location.rootPaths.head + s", ... ${numPaths - 1} entries]"
-          }
-          s"$key: ${location.getClass.getSimpleName} 
${redact(abbreviatedLocation)}"
-        case (key, value) => s"$key: ${redact(value)}"
-      }
-
-    s"""
-       |$formattedNodeName
-       |${ExplainUtils.generateFieldString("Output", output)}
-       |${metadataStr.mkString("\n")}
-       |""".stripMargin
-  }
-
-  lazy val inputRDD: RDD[InternalRow] = {

Review Comment:
   No, we won't call executeColumnar  on CometNativeScanExec anymore. It is 
similar to other native operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to