robbik commented on issue #12931: URL: https://github.com/apache/hudi/issues/12931#issuecomment-2735561591
Hi @rangareddy , sorry for late reply. I am trying to reproduce the issue and finally I was able to reproduce the issue using minimal data. Here is the code I use to reproduce the issue. ``` import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.RecordMergeMode; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieCleanConfig; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieIndexConfig; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.sync.common.HoodieSyncConfig; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import java.io.Serial; import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class Test3 implements Serializable { @Serial private static final long serialVersionUID = -9019298349538148232L; private static final String STAGE_PRIMARY_KEY_COLUMN_NAME_0 = "OurBranchID"; private static final String STAGE_PRIMARY_KEY_COLUMN_NAME_1 = "AccountID"; private final SparkSession spark; protected final String targetTableName; protected final String targetPath; protected final String[] targetPrimaryKeyColumns; protected final String[] targetPartitionColumnAndTypes; protected final String[] targetIndexColumns; protected final int targetParallelism; public Test3(SparkSession spark) { this.spark = spark; this.targetTableName = "loan_installment"; this.targetPath = "/tmp/test3"; this.targetPrimaryKeyColumns = new String[] { STAGE_PRIMARY_KEY_COLUMN_NAME_0, STAGE_PRIMARY_KEY_COLUMN_NAME_1 }; this.targetPartitionColumnAndTypes = new String[] { "OurBranchID:SIMPLE" }; this.targetIndexColumns = new String[0]; this.targetParallelism = 1; } protected String[] extractColumnNames(String... columnNameAndTypes) { final String[] r = new String[columnNameAndTypes.length]; for (int i = 0; i < r.length; i++) { final String v = columnNameAndTypes[i]; if (v.contains(":")) { r[i] = v.split(":", 2)[0]; } else { r[i] = v; } } return r; } protected void writeTarget(Dataset<Row> data, boolean truncateFirst) { final Map<String, String> opts = new HashMap<>(); opts.put(HoodieCleanConfig.CLEANER_POLICY.key(), "KEEP_LATEST_COMMITS"); opts.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "10"); opts.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); opts.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "20"); opts.put(HoodieMetadataConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), "true"); opts.put(HoodieSyncConfig.META_SYNC_ENABLED.key(), "false"); opts.put(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "false"); opts.put("hoodie.datasource.meta.sync.glue.partition_index_fields.enable", "false"); opts.put(HoodieTableConfig.TYPE.key(), HoodieTableType.MERGE_ON_READ.name()); opts.put("hoodie.datasource.write.table.type", HoodieTableType.MERGE_ON_READ.name()); opts.put(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0"); opts.put(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), "snappy"); opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), RecordMergeMode.COMMIT_TIME_ORDERING.name()); if (truncateFirst) { opts.put("hoodie.datasource.write.operation", "insert_overwrite"); opts.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), Integer.toString(targetParallelism)); } else { opts.put("hoodie.datasource.write.operation", "upsert"); opts.put(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), Integer.toString(targetParallelism)); } opts.put(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), Integer.toString(targetParallelism)); opts.put(HoodieCleanConfig.CLEANER_PARALLELISM_VALUE.key(), Integer.toString(targetParallelism)); opts.put(HoodieMetadataConfig.ENABLE.key(), "true"); opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true"); opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true"); opts.put(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.RECORD_INDEX.name()); opts.put(HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true"); opts.put(HoodieIndexConfig.RECORD_INDEX_USE_CACHING.key(), "true"); opts.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true"); opts.put(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "500000000"); opts.put(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1"); opts.put(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), "true"); opts.put(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING.key(), "true"); opts.put(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA.key(), "true"); opts.put(HoodieMetaserverConfig.TABLE_NAME.key(), targetTableName); opts.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), targetTableName); opts.put(HoodieWriteConfig.TBL_NAME.key(), targetTableName); opts.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), "true"); opts.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), "org.apache.hudi.hive.MultiPartKeysValueExtractor"); opts.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), String.join(",", extractColumnNames(targetPartitionColumnAndTypes))); opts.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.CustomKeyGenerator"); opts.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), String.join(",", targetPrimaryKeyColumns)); opts.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), String.join(",", targetPartitionColumnAndTypes)); opts.put("path", targetPath); data.write() .format("org.apache.hudi") .options(opts) .mode(truncateFirst ? SaveMode.Overwrite : SaveMode.Append) .save(); } public void fetchIncremental() throws Exception { List<Row> rows = new ArrayList<>(); rows.add(RowFactory.create("0324", "0324NCA16778", "ACCOUNT NAME XYZ")); rows.add(RowFactory.create("0324", "0324NCA16778A", "ACCOUNT NAME XYZ-A")); final Dataset<Row> source = spark.createDataFrame(rows, DataTypes.createStructType(List.of( DataTypes.createStructField("OurBranchID", DataTypes.StringType, true), DataTypes.createStructField("AccountID", DataTypes.StringType, true), DataTypes.createStructField("AccountName", DataTypes.StringType, true) ))); writeTarget(source, false); } public void fetchAll() throws Exception { List<Row> rows = new ArrayList<>(); for (int i = 1; i < 20_000; i += 2) { rows.add(RowFactory.create("0324", "0324NCA" + String.format("%05d", i), "ACCOUNT NAME " + i)); } rows.add(RowFactory.create("0324", "0324NH", "NH")); rows.add(RowFactory.create("0324", "0324NT700001", "NT700001")); rows.add(RowFactory.create("0324", "0324REG00002", "REG00002")); final Dataset<Row> source = spark.createDataFrame(rows, DataTypes.createStructType(List.of( DataTypes.createStructField("OurBranchID", DataTypes.StringType, true), DataTypes.createStructField("AccountID", DataTypes.StringType, true), DataTypes.createStructField("AccountName", DataTypes.StringType, true) ))); writeTarget(source, true); } protected static SparkSession createSparkSession() { final SparkConf sparkConfig = new SparkConf() .loadFromSystemProperties(true) .setAppName("test2"); sparkConfig.set("spark.ui.enabled", "true"); sparkConfig.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog"); sparkConfig.set("spark.sql.hive.convertMetastoreParquet", "false"); sparkConfig.set("spark.driver.maxResultSize", "256m"); sparkConfig.set("spark.memory.fraction", "0.6"); sparkConfig.set("spark.memory.storageFraction", "0.2"); sparkConfig.set("spark.rdd.compress", "true"); sparkConfig.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); sparkConfig.set("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar"); sparkConfig.set("spark.kryoserializer.buffer.max", "256m"); sparkConfig.set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); sparkConfig.set("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY"); sparkConfig.setMaster("local[6]"); final SparkContext sparkContext = new SparkContext(sparkConfig); sparkContext.setLogLevel("ERROR"); return SparkSession.builder() .config(sparkConfig) .sparkContext(sparkContext) .getOrCreate(); } public static void main(String[] args) throws Exception { SparkSession sparkSession = createSparkSession(); try { new Test3(sparkSession).fetchAll(); } finally { sparkSession.close(); } sparkSession = createSparkSession(); try { new Test3(sparkSession).fetchIncremental(); } finally { sparkSession.close(); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org