parthchandra commented on code in PR #1747:
URL: https://github.com/apache/datafusion-comet/pull/1747#discussion_r2101363175


##########
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala:
##########
@@ -93,21 +93,63 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
           return withInfos(scanExec, fallbackReasons.toSet)
         }
 
-        val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
-        if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
!COMET_EXEC_ENABLED.get()) {
+        var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
+
+        // if scan is auto then pick best available scan
+        if (scanImpl == SCAN_AUTO) {
+          // TODO these checks are not yet exhaustive. For example, 
native_datafusion does
+          //  not support reading from object stores such as S3 yet
+
+          val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
+          val schemaSupported =
+            typeChecker.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons)
+          val partitionSchemaSupported =
+            typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
+
+          // additional checks for known issues
+          def isComplexType(dt: DataType): Boolean = dt match {
+            case _: StructType | _: ArrayType | _: MapType => true
+            case _ => false
+          }
+
+          def hasKnownIssues(dataType: DataType): Boolean = {
+            dataType match {
+              case s: StructType => s.exists(field => 
hasKnownIssues(field.dataType))
+              case a: ArrayType => hasKnownIssues(a.elementType)
+              case m: MapType => isComplexType(m.keyType) || 
isComplexType(m.valueType)
+              case _ => false
+            }
+          }
+
+          val knownIssues =
+            scanExec.requiredSchema.exists(field => 
hasKnownIssues(field.dataType)) ||
+              r.partitionSchema.exists(field => hasKnownIssues(field.dataType))
+
+          if (COMET_EXEC_ENABLED
+              .get() && schemaSupported && partitionSchemaSupported &&
+            !scanExec.bucketedScan && !knownIssues) {
+            scanImpl = SCAN_NATIVE_ICEBERG_COMPAT

Review Comment:
   native_iceberg_compat should be able to handle bucketed scans



##########
spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala:
##########
@@ -93,21 +93,63 @@ case class CometScanRule(session: SparkSession) extends 
Rule[SparkPlan] {
           return withInfos(scanExec, fallbackReasons.toSet)
         }
 
-        val scanImpl = COMET_NATIVE_SCAN_IMPL.get()
-        if (scanImpl == CometConf.SCAN_NATIVE_DATAFUSION && 
!COMET_EXEC_ENABLED.get()) {
+        var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
+
+        // if scan is auto then pick best available scan
+        if (scanImpl == SCAN_AUTO) {
+          // TODO these checks are not yet exhaustive. For example, 
native_datafusion does
+          //  not support reading from object stores such as S3 yet
+
+          val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
+          val schemaSupported =
+            typeChecker.isSchemaSupported(scanExec.requiredSchema, 
fallbackReasons)
+          val partitionSchemaSupported =
+            typeChecker.isSchemaSupported(r.partitionSchema, fallbackReasons)
+
+          // additional checks for known issues
+          def isComplexType(dt: DataType): Boolean = dt match {
+            case _: StructType | _: ArrayType | _: MapType => true
+            case _ => false
+          }
+
+          def hasKnownIssues(dataType: DataType): Boolean = {
+            dataType match {
+              case s: StructType => s.exists(field => 
hasKnownIssues(field.dataType))
+              case a: ArrayType => hasKnownIssues(a.elementType)
+              case m: MapType => isComplexType(m.keyType) || 
isComplexType(m.valueType)
+              case _ => false
+            }
+          }
+
+          val knownIssues =
+            scanExec.requiredSchema.exists(field => 
hasKnownIssues(field.dataType)) ||
+              r.partitionSchema.exists(field => hasKnownIssues(field.dataType))
+
+          if (COMET_EXEC_ENABLED
+              .get() && schemaSupported && partitionSchemaSupported &&
+            !scanExec.bucketedScan && !knownIssues) {
+            scanImpl = SCAN_NATIVE_ICEBERG_COMPAT
+          }
+        }
+
+        if (scanImpl == SCAN_AUTO) {
+          scanImpl = SCAN_NATIVE_COMET

Review Comment:
   We would never choose native_datafusion?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to