rluvaton opened a new issue, #2825:
URL: https://github.com/apache/datafusion-comet/issues/2825
Looking at Auron source code there are number of things that look more
convenient and easier to understand
1. it looks like there is no QueryPlanSerde
## Check for conversion
Checking if operator is supported is usually in each matching operator and
in rare cases in the global conversion, the rare cases are when it depend on
other stuff (for example final aggregation requires native partial aggregation)
this makes it very clear and close to the actual operator
```scala
abstract class NativeSortBase(
sortOrder: Seq[SortOrder],
global: Boolean,
override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
// ...
private def nativeSortExprs = sortOrder.map { sortOrder =>
PhysicalExprNode
.newBuilder()
.setSort(
PhysicalSortExprNode
.newBuilder()
.setExpr(NativeConverters.convertExpr(sortOrder.child))
.setAsc(sortOrder.direction == Ascending)
.setNullsFirst(sortOrder.nullOrdering == NullsFirst)
.build())
.build()
}
// check whether native converting is supported
nativeSortExprs
// ....
}
```
From
[`apache/auron#NativeSortBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala#L80-L94)
## Building protobuf
It looks like there build of the protobuf is done in each operator, making
the conversion and the actual operator closer
again, making it cleaner
```scala
val nativeFilterExec = FilterExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
.addAllExpr(nativeFilterExprs.asJava)
.build()
PhysicalPlanNode.newBuilder().setFilter(nativeFilterExec).build()
```
From
[`apache/auron#NativeFilterBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterBase.scala#L105-L110)
## Different Spark versions code
Support for different spark versions:
It seems like they have some decorator to for conditional code based on
specific spark version which looks much cleaner and easy to understand and
update (currently there are shim files and we need to add those to maven
regarding version, and this looks cleaner, adding new version doesn't seem like
a lot of work):
```scala
case class NativeFilterExec(condition: Expression, override val child:
SparkPlan)
extends NativeFilterBase(condition, child) {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
@sparkver("3.0 / 3.1")
override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan =
copy(child = newChildren.head)
}
```
From
[`apache/auron#NativeFilterExec.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala#L27C3-L27C37)
## Execution
I'm not sure if Auron does operator fusion to save the cost of going back
and forth between the JVM and Rust for each operator. but if they does, their
current implementation look cleaner.
our operators are not really doing anything, they are just placeholder and
the actual execution happen in another place making it harder to understand for
new comers
from Auron code it looks they
```scala
abstract class NativeSortBase(
sortOrder: Seq[SortOrder],
global: Boolean,
override val child: SparkPlan)
extends UnaryExecNode
with NativeSupports {
// ...
override def doExecuteNative(): NativeRDD = {
val inputRDD = NativeHelper.executeNative(child)
val nativeMetrics = SparkMetricNode(metrics, inputRDD.metrics :: Nil)
val nativeSortExprs = this.nativeSortExprs
new NativeRDD(
sparkContext,
nativeMetrics,
rddPartitions = inputRDD.partitions,
rddPartitioner = inputRDD.partitioner,
rddDependencies = new OneToOneDependency(inputRDD) :: Nil,
inputRDD.isShuffleReadFull,
(partition, taskContext) => {
val inputPartition = inputRDD.partitions(partition.index)
val nativeSortExec = SortExecNode
.newBuilder()
.setInput(inputRDD.nativePlan(inputPartition, taskContext))
.addAllExpr(nativeSortExprs.asJava)
.build()
PhysicalPlanNode.newBuilder().setSort(nativeSortExec).build()
},
friendlyName = "NativeRDD.Sort")
}
}
```
From
[`apache/auron#NativeSortBase.scala`](https://github.com/apache/auron/blob/b2577307f62383acbe6be3766a3da64d889a6531/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortBase.scala#L96-L118)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]