yihua commented on code in PR #12772:
URL: https://github.com/apache/hudi/pull/12772#discussion_r2365392558
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieSparkBaseAnalysis.scala:
##########
@@ -51,13 +52,13 @@ import
org.apache.spark.sql.hudi.command.{AlterHoodieTableDropPartitionCommand,
/**
* Rule for resolve hoodie's extended syntax or rewrite some logical plan.
*/
-case class HoodieSpark3ResolveReferences(spark: SparkSession) extends
Rule[LogicalPlan]
+case class HoodieSparkBaseResolveReferences(spark: SparkSession) extends
Rule[LogicalPlan]
Review Comment:
Rename for consistency
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -66,24 +69,30 @@ object HoodieAnalysis extends SparkAdapterSupport {
val dataSourceV2ToV1Fallback: RuleBuilder =
session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)
- val spark3ResolveReferencesClass =
"org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
- val spark3ResolveReferences: RuleBuilder =
- session => instantiateKlass(spark3ResolveReferencesClass, session)
+ val sparkBaseResolveReferencesClass =
"org.apache.spark.sql.hudi.analysis.HoodieSparkBaseResolveReferences"
+ val sparkBaseResolveReferences: RuleBuilder =
+ session => instantiateKlass(sparkBaseResolveReferencesClass, session)
// NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
//
// It's critical for this rules to follow in this order; re-ordering this
rules might lead to changes in
// behavior of Spark's analysis phase (for ex, DataSource V2 to V1
fallback might not kick in before other rules,
// leading to all relations resolving as V2 instead of current expectation
of them being resolved as V1)
- rules ++= Seq(dataSourceV2ToV1Fallback, spark3ResolveReferences)
+ rules ++= Seq(dataSourceV2ToV1Fallback, sparkBaseResolveReferences)
- if (HoodieSparkUtils.gteqSpark3_5) {
+ if (HoodieSparkUtils.isSpark3_5) {
rules += (_ => instantiateKlass(
"org.apache.spark.sql.hudi.analysis.HoodieSpark35ResolveColumnsForInsertInto"))
}
+ if (HoodieSparkUtils.isSpark4_0) {
Review Comment:
Use descending order by version and else if
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/model/TestHoodieInternalRow.java:
##########
@@ -65,11 +66,14 @@ public void testGet() {
Object[] values = getRandomValue(true);
InternalRow row = new GenericInternalRow(values);
- HoodieInternalRow hoodieInternalRow = new
HoodieInternalRow(UTF8String.fromString("commitTime"),
- UTF8String.fromString("commitSeqNo"),
- UTF8String.fromString("recordKey"),
- UTF8String.fromString("partitionPath"),
- UTF8String.fromString("fileName"),
+ HoodieInternalRow hoodieInternalRow =
SparkAdapterSupport$.MODULE$.sparkAdapter().createInternalRow(
+ new UTF8String[] {
Review Comment:
Does new array here add more overhead per row?
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/rollback/TestCopyOnWriteRollbackActionExecutor.java:
##########
@@ -294,8 +294,8 @@ public void testRollbackScale() throws Exception {
}
private void performRollbackAndValidate(boolean isUsingMarkers,
HoodieWriteConfig cfg, HoodieTable table,
- List<FileSlice>
firstPartitionCommit2FileSlices,
- List<FileSlice>
secondPartitionCommit2FileSlices) throws IOException, InterruptedException {
+ List<FileSlice> firstPartitionCommit2FileSlices,
Review Comment:
Revert format changes only
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -66,24 +69,30 @@ object HoodieAnalysis extends SparkAdapterSupport {
val dataSourceV2ToV1Fallback: RuleBuilder =
session => instantiateKlass(dataSourceV2ToV1FallbackClass, session)
- val spark3ResolveReferencesClass =
"org.apache.spark.sql.hudi.analysis.HoodieSpark3ResolveReferences"
- val spark3ResolveReferences: RuleBuilder =
- session => instantiateKlass(spark3ResolveReferencesClass, session)
+ val sparkBaseResolveReferencesClass =
"org.apache.spark.sql.hudi.analysis.HoodieSparkBaseResolveReferences"
Review Comment:
Revisit naming to avoid confusion
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -114,13 +123,10 @@ object HoodieAnalysis extends SparkAdapterSupport {
session => HoodiePostAnalysisRule(session)
)
- if (HoodieSparkUtils.isSpark3) {
- val spark3PostHocResolutionClass =
"org.apache.spark.sql.hudi.analysis.HoodieSpark3PostAnalysisRule"
- val spark3PostHocResolution: RuleBuilder =
- session => instantiateKlass(spark3PostHocResolutionClass, session)
-
- rules += spark3PostHocResolution
- }
+ val sparkBasePostHocResolutionClass =
"org.apache.spark.sql.hudi.analysis.HoodieSparkBasePostAnalysisRule"
Review Comment:
Revisit naming
##########
hudi-spark-datasource/hudi-spark-common/src/test/scala/org/apache/spark/execution/datasources/parquet/TestSparkFileFormatInternalRowReaderContext.scala:
##########
@@ -90,8 +88,6 @@ class TestSparkFileFormatInternalRowReaderContext extends
SparkClientFunctionalT
assertEquals(1.1f,
sparkReaderContext.getRecordContext().convertValueToEngineType(1.1f))
assertEquals(1.1d,
sparkReaderContext.getRecordContext().convertValueToEngineType(1.1d))
assertEquals(UTF8String.fromString(stringValue),
-
sparkReaderContext.getRecordContext().convertValueToEngineType(stringValue))
- assertEquals(UTF8String.fromString(stringValue),
-
sparkReaderContext.getRecordContext().convertValueToEngineType(UTF8String.fromString(stringValue)))
+
sparkReaderContext.getRecordContext().convertPartitionValueToEngineType(stringValue))
Review Comment:
Unit tests on the new method added in `RecordContext`?
##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala:
##########
@@ -152,7 +159,12 @@ object HoodieAnalysis extends SparkAdapterSupport {
// To work this around, we injecting this as the rule that trails
pre-CBO, ie it's
// - Triggered before CBO, therefore have access to the same
stats as CBO
// - Precedes actual [[customEarlyScanPushDownRules]] invocation
- rules += (spark => HoodiePruneFileSourcePartitions(spark))
+ val hoodiePruneFileSourcePartitionsClass = if
(HoodieSparkUtils.gteqSpark4_0) {
Review Comment:
```suggestion
val pruneFileSourcePartitionsClass = if (HoodieSparkUtils.gteqSpark4_0) {
```
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java:
##########
@@ -253,10 +257,11 @@ private Comparator<HoodieRecord<? extends
HoodieRecordPayload>> getCustomColumnC
throw new HoodieIOException("unable to read value for " + sortColumns);
}
}, (o1, o2) -> {
- FlatLists.ComparableList values1 =
FlatLists.ofComparableArray(o1.toArray());
- FlatLists.ComparableList values2 =
FlatLists.ofComparableArray(o2.toArray());
+ FlatLists.ComparableList values1 =
FlatLists.ofComparableArray(HOODIE_UTF8STRING_FACTORY.wrapArrayOfObjects(o1.toArray()));
Review Comment:
Double check the indentation changes
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java:
##########
@@ -56,6 +58,8 @@
public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase
implements Serializable {
private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>>
KEY_COMPARATOR =
Comparator.comparing(o -> (o.getPartitionPath() + "+" +
o.getRecordKey()));
+ private static final HoodieUTF8StringFactory HOODIE_UTF8STRING_FACTORY =
Review Comment:
Naming
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]