yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2074212543
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
* @return
*/
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+ def getSchema(conn: Connection,
+ resultSet: ResultSet,
+ dialect: JdbcDialect,
+ alwaysNullable: Boolean = false,
+ isTimestampNTZ: Boolean = false): StructType
Review Comment:
Add scaladocs
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
* @return
*/
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+ def getSchema(conn: Connection,
+ resultSet: ResultSet,
+ dialect: JdbcDialect,
+ alwaysNullable: Boolean = false,
+ isTimestampNTZ: Boolean = false): StructType
+
+ /**
+ * [SPARK-46832] Using UTF8String compareTo directly would throw
UnsupportedOperationException since Spark 4.0
+ */
+ def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
+
+ /**
+ * [SPARK-46832] Using UTF8String compareTo directly would throw
UnsupportedOperationException since Spark 4.0
+ * FlatLists is a static class and we cannot override any methods within to
change the logic for comparison
+ * So we have to create [[Spark4FlatLists]] for Spark 4.0+
+ */
+ def createComparableList(t: Array[AnyRef]):
FlatLists.ComparableList[Comparable[HoodieRecord[_]]] =
FlatLists.ofComparableArray(t)
Review Comment:
For places that need to compare `UTF8String`, could we introduce
`HoodieUTF8String` implemented by `Spark3HoodieUTF8String` and
`Spark4HoodieUTF8String` and override `compareTo` by calling the right method
in different Spark versions, so that we can avoid having many places using
spark adapter?
For example, here it would receive an array that can contain
`HoodieUTF8String` which works with `compareTo` so there is no need to change
`FlatLists` and add `Spark4FlatLists` (which shares the same code mostly).
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala:
##########
@@ -250,4 +260,52 @@ trait SparkAdapter extends Serializable {
* @return
*/
def stopSparkContext(jssc: JavaSparkContext, exitCode: Int): Unit
+
+ def getSchema(conn: Connection,
+ resultSet: ResultSet,
+ dialect: JdbcDialect,
+ alwaysNullable: Boolean = false,
+ isTimestampNTZ: Boolean = false): StructType
+
+ /**
+ * [SPARK-46832] Using UTF8String compareTo directly would throw
UnsupportedOperationException since Spark 4.0
+ */
+ def compareUTF8String(a: UTF8String, b: UTF8String): Int = a.compareTo(b)
Review Comment:
Similarly, I see that this is used in places where we compare the values
from the logic we control. So could we introduce `HoodieUTF8String` which
wraps the `UTF8String` and overrides `compareTo` by calling the right method in
different Spark versions. Then there is no need to have `compareUTF8String`
and the logic divergence is captured in `HoodieUTF8String`.
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodiePartitionValues.scala:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.spark.sql.catalyst.InternalRow
+
+trait HoodiePartitionValues extends InternalRow {
Review Comment:
Could you shed light on why this class and
`HoodiePartitionCDCFileGroupMapping` and `HoodiePartitionFileSliceMapping` are
required for Spark 4 upgrade? Is it because of Spark's breaking changes in
basic data structures?
##########
hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkJdbcUtils.scala:
##########
@@ -23,21 +23,23 @@ import
org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types.StructType
-import java.sql.ResultSet
+import java.sql.{Connection, ResultSet}
/**
* Util functions for JDBC source and tables in Spark.
*/
-object SparkJdbcUtils {
+object SparkJdbcUtils extends SparkAdapterSupport {
/**
* Takes a [[ResultSet]] and returns its Catalyst schema.
*
+ * @param conn [[Connection]] instance.
* @param resultSet [[ResultSet]] instance.
* @param dialect [[JdbcDialect]] instance.
* @param alwaysNullable If true, all the columns are nullable.
* @return A [[StructType]] giving the Catalyst schema.
*/
- def getSchema(resultSet: ResultSet,
+ def getSchema(conn: Connection,
Review Comment:
This util class and method are only used by `UtilHelpers`. Should we move
the logic to `UtilHelpers` and make it inline, i.e., directly call
`sparkAdapter.getSchema(conn, resultSet, dialect, alwaysNullable)` inside
`UtilHelpers#getJDBCSchema`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]