rluvaton opened a new issue, #2825:
URL: https://github.com/apache/datafusion-comet/issues/2825

   Looking at Auron source code there are number of things that look more 
convenient and easier to understand
   
   1. it looks like there is no QueryPlanSerde
   
   ## Check for conversion
   
   Checking if operator is supported is usually in each matching operator and 
in rare cases in the global conversion, the rare cases are when it depend on 
other stuff (for example final aggregation requires native partial aggregation)
   
   this makes it very clear and close to the actual operator 
   
   ```scala
   abstract class NativeSortBase(
       sortOrder: Seq[SortOrder],
       global: Boolean,
       override val child: SparkPlan)
       extends UnaryExecNode
       with NativeSupports {
     // ...
   
     private def nativeSortExprs = sortOrder.map { sortOrder =>
       PhysicalExprNode
         .newBuilder()
         .setSort(
           PhysicalSortExprNode
             .newBuilder()
             .setExpr(NativeConverters.convertExpr(sortOrder.child))
             .setAsc(sortOrder.direction == Ascending)
             .setNullsFirst(sortOrder.nullOrdering == NullsFirst)
             .build())
         .build()
     }
   
     // check whether native converting is supported
     nativeSortExprs
   
     // ....
   }
   ```
   From 
[`apache/auron#NativeSortBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala#L80-L94)
   
   ## Building protobuf
   
   It looks like there build of the protobuf is done in each operator, making 
the conversion and the actual operator closer
   
   again, making it cleaner
   
   ```scala
   val nativeFilterExec = FilterExecNode
     .newBuilder()
     .setInput(inputRDD.nativePlan(inputPartition, taskContext))
     .addAllExpr(nativeFilterExprs.asJava)
     .build()
   PhysicalPlanNode.newBuilder().setFilter(nativeFilterExec).build()
   ```
   
   From 
[`apache/auron#NativeFilterBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala#L105-L110)
   
   ## Different Spark versions code
   Support for different spark versions:
   
   It seems like they have some decorator to for conditional code based on 
specific spark version which looks much cleaner and easy to understand and 
update (currently there are shim files and we need to add those to maven 
regarding version, and this looks cleaner, adding new version doesn't seem like 
a lot of work):
   ```scala
   case class NativeFilterExec(condition: Expression, override val child: 
SparkPlan)
       extends NativeFilterBase(condition, child) {
   
     @sparkver("3.2 / 3.3 / 3.4 / 3.5")
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
   
     @sparkver("3.0 / 3.1")
     override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
       copy(child = newChildren.head)
   }
   ```
   From 
[`apache/auron#NativeFilterExec.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala#L27C3-L27C37)
   
   ## Execution
   
   I'm not sure if Auron does operator fusion to save the cost of going back 
and forth between the JVM and Rust for each operator. but if they does, their 
current implementation look cleaner.
   
   our operators are not really doing anything, they are just placeholder and 
the actual execution happen in another place making it harder to understand for 
new comers
   
   from Auron code it looks they 
   
   ```scala
   abstract class NativeSortBase(
       sortOrder: Seq[SortOrder],
       global: Boolean,
       override val child: SparkPlan)
       extends UnaryExecNode
       with NativeSupports {
   
     // ...
   
     override def doExecuteNative(): NativeRDD = {
       val inputRDD = NativeHelper.executeNative(child)
       val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
       val nativeSortExprs = this.nativeSortExprs
   
       new NativeRDD(
         sparkContext,
         nativeMetrics,
         rddPartitions = inputRDD.partitions,
         rddPartitioner = inputRDD.partitioner,
         rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
         inputRDD.isShuffleReadFull,
         (partition, taskContext) => {
           val inputPartition = inputRDD.partitions(partition.index)
           val nativeSortExec = SortExecNode
             .newBuilder()
             .setInput(inputRDD.nativePlan(inputPartition, taskContext))
             .addAllExpr(nativeSortExprs.asJava)
             .build()
           PhysicalPlanNode.newBuilder().setSort(nativeSortExec).build()
         },
         friendlyName = "NativeRDD.Sort")
     }
   
   }
   ```
   From 
[`apache/auron#NativeSortBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala#L96-L118)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to