xiarixiaoyao commented on a change in pull request #4910: URL: https://github.com/apache/hudi/pull/4910#discussion_r840412756
########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java ########## @@ -73,7 +73,8 @@ INFLIGHT_COMPACTION_EXTENSION, REQUESTED_COMPACTION_EXTENSION, REQUESTED_RESTORE_EXTENSION, INFLIGHT_RESTORE_EXTENSION, RESTORE_EXTENSION, ROLLBACK_EXTENSION, REQUESTED_ROLLBACK_EXTENSION, INFLIGHT_ROLLBACK_EXTENSION, - REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION)); + REQUESTED_REPLACE_COMMIT_EXTENSION, INFLIGHT_REPLACE_COMMIT_EXTENSION, REPLACE_COMMIT_EXTENSION, + REQUESTED_SAVE_SCHEMA_ACTION_EXTENSION, INFLIGHT_SAVE_SCHEMA_ACTION_EXTENSION, SAVE_SCHEMA_ACTION_EXTENSION)); Review comment: those action only for schema saveļ¼ i think it will be better to keep them as SAVE_SCHEMA_ACTION ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java ########## @@ -55,6 +55,8 @@ String COMPACTION_ACTION = "compaction"; String REQUESTED_EXTENSION = ".requested"; String RESTORE_ACTION = "restore"; + // only for schema save + String SAVE_SCHEMA_ACTION = "schemacommit"; Review comment: fixed ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala ########## @@ -60,6 +60,12 @@ class BaseFileOnlyRelation(sqlContext: SQLContext, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieUnsafeRDD = { + if (!internalSchema.isEmptySchema) { + // it is safe to enable vectorizedReader + sqlContext.sparkSession.sessionState.conf.setConfString("spark.sql.parquet.recordLevelFilter.enabled", "false") Review comment: fixed ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala ########## @@ -315,6 +329,32 @@ object HoodieSparkSqlWriter { processedRecord } + def addSchemaEvolutionParameters(parameters: Map[String, String], internalSchemaOpt: Option[InternalSchema]): Map[String, String] = { + val schemaEvolutionEnable = if (internalSchemaOpt.isDefined) "true" else "false" + parameters ++ Map(HoodieWriteConfig.INTERNAL_SCHEMA_STRING.key() -> SerDeHelper.toJson(internalSchemaOpt.getOrElse(null)), + HoodieWriteConfig.SCHEMA_EVOLUTION_ENABLE.key() -> schemaEvolutionEnable) + } + + /** + * get latest internalSchema from table + * + * @param fs instance of FileSystem. + * @param basePath base path. + * @param sparkContext instance of spark context. + * @param schema incoming record's schema. + * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. + */ + def getLatestTableInternalSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[InternalSchema] = { + if (FSUtils.isTableExists(basePath.toString, fs)) { Review comment: fixed ########## File path: hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala ########## @@ -61,6 +61,11 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext, tableSchema: HoodieTableSchema, requiredSchema: HoodieTableSchema, filters: Array[Filter]): HoodieMergeOnReadRDD = { + if (!internalSchema.isEmptySchema) { Review comment: fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org