yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2074256515


##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
    * @return
    */
   def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+  def getSchema(conn: Connection,
+                resultSet: ResultSet,
+                dialect: JdbcDialect,
+                alwaysNullable: Boolean = false,
+                isTimestampNTZ: Boolean = false): StructType
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   */
+  def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   * FlatLists is a static class and we cannot override any methods within to 
change the logic for comparison
+   * So we have to create [[Spark4FlatLists]] for Spark 4.0+
+   */
+  def createComparableList(t: Array[AnyRef]): 
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] = 
FlatLists.ofComparableArray(t)
+
+  def createInternalRow(commitTime: UTF8String,
+                        commitSeqNumber: UTF8String,
+                        recordKey: UTF8String,
+                        partitionPath: UTF8String,
+                        fileName: UTF8String,
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createInternalRow(metaFields: Array[UTF8String],
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createHoodiePartitionCDCFileGroupMapping(partitionValues: InternalRow,
+                                               fileSplits: 
List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping
+
+  def createHoodiePartitionFileSliceMapping(values: InternalRow,
+                                            slices: Map[String, FileSlice]): 
HoodiePartitionFileSliceMapping
+
+  def newParseException(command: Option[String],
+                        exception: AnalysisException,
+                        start: Origin,
+                        stop: Origin): ParseException
+
+  def compareValues[T <% Comparable[T]](a: T, b: T): Int = a.compareTo(b)

Review Comment:
   Similarly this could be removed if we add `HoodieUTF8String` for controlling 
the value comparison.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
    * @return
    */
   def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+  def getSchema(conn: Connection,
+                resultSet: ResultSet,
+                dialect: JdbcDialect,
+                alwaysNullable: Boolean = false,
+                isTimestampNTZ: Boolean = false): StructType
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   */
+  def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   * FlatLists is a static class and we cannot override any methods within to 
change the logic for comparison
+   * So we have to create [[Spark4FlatLists]] for Spark 4.0+
+   */
+  def createComparableList(t: Array[AnyRef]): 
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] = 
FlatLists.ofComparableArray(t)
+
+  def createInternalRow(commitTime: UTF8String,
+                        commitSeqNumber: UTF8String,
+                        recordKey: UTF8String,
+                        partitionPath: UTF8String,
+                        fileName: UTF8String,
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createInternalRow(metaFields: Array[UTF8String],

Review Comment:
   Should there be only one `createInternalRow`?  The first one merely 
constructs `this.metaFields = new UTF8String[] {
           commitTime,
           commitSeqNumber,
           recordKey,
           partitionPath,
           fileName
       };`



##########
hudi-spark-datasource/hudi-spark4-common/src/main/scala/org/apache/spark/sql/adapter/BaseSpark4Adapter.scala:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.adapter
+
+import org.apache.hudi.{AvroConversionUtils, DefaultSource, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
Spark4HoodiePartitionCDCFileGroupMapping, 
Spark4HoodiePartitionFileSliceMapping, Spark4RowSerDe}
+import org.apache.hudi.client.model.{HoodieInternalRow, 
Spark4HoodieInternalRow}
+import org.apache.hudi.client.utils.SparkRowSerDe
+import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
+import org.apache.hudi.common.table.HoodieTableMetaClient
+import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
+import org.apache.hudi.common.util.JsonUtils
+import org.apache.hudi.common.util.collection.{FlatLists, Spark4FlatLists}
+import org.apache.hudi.spark4.internal.ReflectUtil
+import org.apache.hudi.storage.StoragePath
+
+import org.apache.avro.Schema
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{AnalysisException, HoodieSpark4CatalogUtils, 
SparkSession, SQLContext}
+import org.apache.spark.sql.avro.{HoodieAvroSchemaConverters, 
HoodieSparkAvroSchemaConverters}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Expression, 
InterpretedPredicate, Predicate}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.trees.Origin
+import org.apache.spark.sql.catalyst.util.DateFormatter
+import org.apache.spark.sql.execution.{PartitionedFileUtil, QueryExecution, 
SQLExecution}
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hudi.SparkAdapter
+import org.apache.spark.sql.sources.{BaseRelation, Filter}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.unsafe.types.UTF8String
+
+import java.time.ZoneId
+import java.util.TimeZone
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+/**
+ * Base implementation of [[SparkAdapter]] for Spark 3.x branch
+ */
+abstract class BaseSpark4Adapter extends SparkAdapter with Logging {
+
+  JsonUtils.registerModules()
+
+  private val cache = new ConcurrentHashMap[ZoneId, DateFormatter](1)
+
+  def getCatalogUtils: HoodieSpark4CatalogUtils
+
+  override def createSparkRowSerDe(schema: StructType): SparkRowSerDe = {
+    new Spark4RowSerDe(getCatalystExpressionUtils.getEncoder(schema))
+  }
+
+  override def getAvroSchemaConverters: HoodieAvroSchemaConverters = 
HoodieSparkAvroSchemaConverters
+
+  override def getSparkParsePartitionUtil: SparkParsePartitionUtil = 
Spark4ParsePartitionUtil
+
+  override def getDateFormatter(tz: TimeZone): DateFormatter = {
+    cache.computeIfAbsent(tz.toZoneId, zoneId => 
ReflectUtil.getDateFormatter(zoneId))
+  }
+
+  /**
+   * Combine [[PartitionedFile]] to [[FilePartition]] according to 
`maxSplitBytes`.
+   */
+  override def getFilePartitions(
+      sparkSession: SparkSession,
+      partitionedFiles: Seq[PartitionedFile],
+      maxSplitBytes: Long): Seq[FilePartition] = {
+    FilePartition.getFilePartitions(sparkSession, partitionedFiles, 
maxSplitBytes)
+  }
+
+  override def createInterpretedPredicate(e: Expression): InterpretedPredicate 
= {
+    Predicate.createInterpreted(e)
+  }
+
+  override def createRelation(sqlContext: SQLContext,
+                              metaClient: HoodieTableMetaClient,
+                              schema: Schema,
+                              globPaths: Array[StoragePath],
+                              parameters: java.util.Map[String, String]): 
BaseRelation = {
+    val dataSchema = 
Option(schema).map(AvroConversionUtils.convertAvroSchemaToStructType).orNull
+    DefaultSource.createRelation(sqlContext, metaClient, dataSchema, 
globPaths, parameters.asScala.toMap)
+  }
+
+  override def convertStorageLevelToString(level: StorageLevel): String
+
+  override def translateFilter(predicate: Expression,
+                               supportNestedPredicatePushdown: Boolean = 
false): Option[Filter] = {
+    DataSourceStrategy.translateFilter(predicate, 
supportNestedPredicatePushdown)
+  }
+
+  override def makeColumnarBatch(vectors: Array[ColumnVector], numRows: Int): 
ColumnarBatch = {
+    new ColumnarBatch(vectors, numRows)
+  }
+
+  override def sqlExecutionWithNewExecutionId[T](sparkSession: SparkSession,
+                                                 queryExecution: 
QueryExecution,
+                                                 name: Option[String])(body: 
=> T): T = {
+      SQLExecution.withNewExecutionId(queryExecution, name)(body)
+  }
+
+  def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+  override def compareUTF8String(a: UTF8String, b: UTF8String): Int = 
a.binaryCompare(b)
+
+  override def createComparableList(t: Array[AnyRef]): 
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] = 
Spark4FlatLists.ofComparableArray(t)
+
+  override def createInternalRow(commitTime: UTF8String,
+                                 commitSeqNumber: UTF8String,
+                                 recordKey: UTF8String,
+                                 partitionPath: UTF8String,
+                                 fileName: UTF8String,
+                                 sourceRow: InternalRow,
+                                 sourceContainsMetaFields: Boolean): 
HoodieInternalRow = {
+    new Spark4HoodieInternalRow(commitTime, commitSeqNumber, recordKey, 
partitionPath, fileName, sourceRow, sourceContainsMetaFields)
+  }
+
+  override def createInternalRow(metaFields: Array[UTF8String],
+                                 sourceRow: InternalRow,
+                                 sourceContainsMetaFields: Boolean): 
HoodieInternalRow = {
+    new Spark4HoodieInternalRow(metaFields, sourceRow, 
sourceContainsMetaFields)
+  }
+
+  override def createHoodiePartitionCDCFileGroupMapping(partitionValues: 
InternalRow,
+                                                        fileSplits: 
List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping = {
+    new Spark4HoodiePartitionCDCFileGroupMapping(partitionValues, fileSplits)
+  }
+
+  override def createHoodiePartitionFileSliceMapping(values: InternalRow,
+                                                     slices: Map[String, 
FileSlice]): HoodiePartitionFileSliceMapping = {
+    new Spark4HoodiePartitionFileSliceMapping(values, slices)
+  }
+
+  override def newParseException(command: Option[String],
+                                 exception: AnalysisException,
+                                 start: Origin,
+                                 stop: Origin): ParseException = {
+    new ParseException(command, start, stop, exception.getErrorClass, 
exception.getMessageParameters.asScala.toMap)

Review Comment:
   Is this because of the constructor change of `ParseException`?  However I 
don't see that in Spark's `branch-4.0`.



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
    * @return
    */
   def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+  def getSchema(conn: Connection,
+                resultSet: ResultSet,
+                dialect: JdbcDialect,
+                alwaysNullable: Boolean = false,
+                isTimestampNTZ: Boolean = false): StructType
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   */
+  def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   * FlatLists is a static class and we cannot override any methods within to 
change the logic for comparison
+   * So we have to create [[Spark4FlatLists]] for Spark 4.0+
+   */
+  def createComparableList(t: Array[AnyRef]): 
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] = 
FlatLists.ofComparableArray(t)
+
+  def createInternalRow(commitTime: UTF8String,
+                        commitSeqNumber: UTF8String,
+                        recordKey: UTF8String,
+                        partitionPath: UTF8String,
+                        fileName: UTF8String,
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createInternalRow(metaFields: Array[UTF8String],
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createHoodiePartitionCDCFileGroupMapping(partitionValues: InternalRow,
+                                               fileSplits: 
List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping
+
+  def createHoodiePartitionFileSliceMapping(values: InternalRow,
+                                            slices: Map[String, FileSlice]): 
HoodiePartitionFileSliceMapping

Review Comment:
   These seem unnecessary?



##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
    * @return
    */
   def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+  def getSchema(conn: Connection,
+                resultSet: ResultSet,
+                dialect: JdbcDialect,
+                alwaysNullable: Boolean = false,
+                isTimestampNTZ: Boolean = false): StructType
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   */
+  def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
+
+  /**
+   * [SPARK-46832] Using UTF8String compareTo directly would throw 
UnsupportedOperationException since Spark 4.0
+   * FlatLists is a static class and we cannot override any methods within to 
change the logic for comparison
+   * So we have to create [[Spark4FlatLists]] for Spark 4.0+
+   */
+  def createComparableList(t: Array[AnyRef]): 
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] = 
FlatLists.ofComparableArray(t)
+
+  def createInternalRow(commitTime: UTF8String,
+                        commitSeqNumber: UTF8String,
+                        recordKey: UTF8String,
+                        partitionPath: UTF8String,
+                        fileName: UTF8String,
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createInternalRow(metaFields: Array[UTF8String],
+                        sourceRow: InternalRow,
+                        sourceContainsMetaFields: Boolean): HoodieInternalRow
+
+  def createHoodiePartitionCDCFileGroupMapping(partitionValues: InternalRow,
+                                               fileSplits: 
List[HoodieCDCFileSplit]): HoodiePartitionCDCFileGroupMapping
+
+  def createHoodiePartitionFileSliceMapping(values: InternalRow,
+                                            slices: Map[String, FileSlice]): 
HoodiePartitionFileSliceMapping
+
+  def newParseException(command: Option[String],
+                        exception: AnalysisException,
+                        start: Origin,
+                        stop: Origin): ParseException
+
+  def compareValues[T <% Comparable[T]](a: T, b: T): Int = a.compareTo(b)
+
+  def splitFiles(sparkSession: SparkSession,
+                 partitionDirectory: PartitionDirectory,
+                 isSplitable: Boolean,
+                 maxSplitSize: Long): Seq[PartitionedFile]

Review Comment:
   Remove this API which is only used by tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to