robbik commented on issue #12931:
URL: https://github.com/apache/hudi/issues/12931#issuecomment-2735561591

   Hi @rangareddy , sorry for late reply. I am trying to reproduce the issue 
and finally I was able to reproduce the issue using minimal data. Here is the 
code I use to reproduce the issue.
   
   ```
   import org.apache.hudi.common.config.HoodieMetadataConfig;
   import org.apache.hudi.common.config.HoodieMetaserverConfig;
   import org.apache.hudi.common.config.HoodieStorageConfig;
   import org.apache.hudi.common.config.RecordMergeMode;
   import org.apache.hudi.common.model.HoodieTableType;
   import org.apache.hudi.common.table.HoodieTableConfig;
   import org.apache.hudi.config.HoodieCleanConfig;
   import org.apache.hudi.config.HoodieCompactionConfig;
   import org.apache.hudi.config.HoodieIndexConfig;
   import org.apache.hudi.config.HoodieWriteConfig;
   import org.apache.hudi.hive.HiveSyncConfig;
   import org.apache.hudi.index.HoodieIndex;
   import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
   import org.apache.hudi.sync.common.HoodieSyncConfig;
   import org.apache.spark.SparkConf;
   import org.apache.spark.SparkContext;
   import org.apache.spark.sql.Dataset;
   import org.apache.spark.sql.Row;
   import org.apache.spark.sql.RowFactory;
   import org.apache.spark.sql.SaveMode;
   import org.apache.spark.sql.SparkSession;
   import org.apache.spark.sql.types.DataTypes;
   
   import java.io.Serial;
   import java.io.Serializable;
   import java.util.ArrayList;
   import java.util.HashMap;
   import java.util.List;
   import java.util.Map;
   
   public class Test3 implements Serializable {
   
       @Serial
       private static final long serialVersionUID = -9019298349538148232L;
   
       private static final String STAGE_PRIMARY_KEY_COLUMN_NAME_0 = 
"OurBranchID";
       private static final String STAGE_PRIMARY_KEY_COLUMN_NAME_1 = 
"AccountID";
   
       private final SparkSession spark;
   
       protected final String targetTableName;
       protected final String targetPath;
   
       protected final String[] targetPrimaryKeyColumns;
       protected final String[] targetPartitionColumnAndTypes;
       protected final String[] targetIndexColumns;
   
       protected final int targetParallelism;
   
       public Test3(SparkSession spark) {
           this.spark = spark;
   
           this.targetTableName = "loan_installment";
   
           this.targetPath = "/tmp/test3";
   
           this.targetPrimaryKeyColumns = new String[] {
                   STAGE_PRIMARY_KEY_COLUMN_NAME_0,
                   STAGE_PRIMARY_KEY_COLUMN_NAME_1
           };
   
           this.targetPartitionColumnAndTypes = new String[] { 
"OurBranchID:SIMPLE" };
   
           this.targetIndexColumns = new String[0];
   
           this.targetParallelism = 1;
       }
   
       protected String[] extractColumnNames(String... columnNameAndTypes) {
           final String[] r = new String[columnNameAndTypes.length];
   
           for (int i = 0; i < r.length; i++) {
               final String v = columnNameAndTypes[i];
   
               if (v.contains(":")) {
                   r[i] = v.split(":", 2)[0];
               } else {
                   r[i] = v;
               }
           }
   
           return r;
       }
   
       protected void writeTarget(Dataset<Row> data, boolean truncateFirst) {
           final Map<String, String> opts = new HashMap<>();
   
           opts.put(HoodieCleanConfig.CLEANER_POLICY.key(), 
"KEEP_LATEST_COMMITS");
           opts.put(HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), "10");
   
           opts.put(HoodieCompactionConfig.INLINE_COMPACT.key(), "true");
           
opts.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "20");
   
           
opts.put(HoodieMetadataConfig.ENABLE_OPTIMIZED_LOG_BLOCKS_SCAN.key(), "true");
   
           opts.put(HoodieSyncConfig.META_SYNC_ENABLED.key(), "false");
   
           opts.put(HiveSyncConfig.HIVE_SYNC_ENABLED.key(), "false");
           
opts.put("hoodie.datasource.meta.sync.glue.partition_index_fields.enable", 
"false");
   
           opts.put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
           opts.put("hoodie.datasource.write.table.type", 
HoodieTableType.MERGE_ON_READ.name());
   
           opts.put(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "0");
           opts.put(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), 
"snappy");
   
           opts.put(HoodieWriteConfig.RECORD_MERGE_MODE.key(), 
RecordMergeMode.COMMIT_TIME_ORDERING.name());
   
           if (truncateFirst) {
               opts.put("hoodie.datasource.write.operation", 
"insert_overwrite");
   
               opts.put(HoodieWriteConfig.BULKINSERT_PARALLELISM_VALUE.key(), 
Integer.toString(targetParallelism));
           } else {
               opts.put("hoodie.datasource.write.operation", "upsert");
   
               opts.put(HoodieWriteConfig.UPSERT_PARALLELISM_VALUE.key(), 
Integer.toString(targetParallelism));
           }
   
           opts.put(HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key(), 
Integer.toString(targetParallelism));
           opts.put(HoodieCleanConfig.CLEANER_PARALLELISM_VALUE.key(), 
Integer.toString(targetParallelism));
   
           opts.put(HoodieMetadataConfig.ENABLE.key(), "true");
   
           
opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true");
           
opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), 
"true");
   
           
opts.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_BLOOM_FILTER.key(), "true");
   
           opts.put(HoodieIndexConfig.INDEX_TYPE.key(), 
HoodieIndex.IndexType.RECORD_INDEX.name());
   
           
opts.put(HoodieIndexConfig.RECORD_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), 
"true");
   
           opts.put(HoodieIndexConfig.RECORD_INDEX_USE_CACHING.key(), "true");
   
           opts.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), 
"true");
   
           
opts.put(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), 
"500000000");
           
opts.put(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), 
"1");
   
           
opts.put(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE.key(), 
"true");
   
           opts.put(HoodieIndexConfig.BLOOM_INDEX_USE_CACHING.key(), "true");
           opts.put(HoodieIndexConfig.BLOOM_INDEX_USE_METADATA.key(), "true");
   
           opts.put(HoodieMetaserverConfig.TABLE_NAME.key(), targetTableName);
   
           opts.put(HoodieSyncConfig.META_SYNC_TABLE_NAME.key(), 
targetTableName);
           opts.put(HoodieWriteConfig.TBL_NAME.key(), targetTableName);
   
           opts.put(KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), 
"true");
   
           opts.put(HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key(), 
"org.apache.hudi.hive.MultiPartKeysValueExtractor");
           opts.put(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key(), 
String.join(",", extractColumnNames(targetPartitionColumnAndTypes)));
   
           opts.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), 
"org.apache.hudi.keygen.CustomKeyGenerator");
           opts.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), 
String.join(",", targetPrimaryKeyColumns));
           opts.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), 
String.join(",", targetPartitionColumnAndTypes));
   
           opts.put("path", targetPath);
   
           data.write()
                   .format("org.apache.hudi")
                   .options(opts)
                   .mode(truncateFirst ? SaveMode.Overwrite : SaveMode.Append)
                   .save();
       }
   
       public void fetchIncremental() throws Exception {
           List<Row> rows = new ArrayList<>();
   
           rows.add(RowFactory.create("0324", "0324NCA16778", "ACCOUNT NAME 
XYZ"));
           rows.add(RowFactory.create("0324", "0324NCA16778A", "ACCOUNT NAME 
XYZ-A"));
   
           final Dataset<Row> source = spark.createDataFrame(rows, 
DataTypes.createStructType(List.of(
                   DataTypes.createStructField("OurBranchID", 
DataTypes.StringType, true),
                   DataTypes.createStructField("AccountID", 
DataTypes.StringType, true),
                   DataTypes.createStructField("AccountName", 
DataTypes.StringType, true)
           )));
   
           writeTarget(source, false);
       }
   
       public void fetchAll() throws Exception {
           List<Row> rows = new ArrayList<>();
   
           for (int i = 1; i < 20_000; i += 2) {
               rows.add(RowFactory.create("0324", "0324NCA" + 
String.format("%05d", i), "ACCOUNT NAME " + i));
           }
   
           rows.add(RowFactory.create("0324", "0324NH", "NH"));
           rows.add(RowFactory.create("0324", "0324NT700001", "NT700001"));
           rows.add(RowFactory.create("0324", "0324REG00002", "REG00002"));
   
           final Dataset<Row> source = spark.createDataFrame(rows, 
DataTypes.createStructType(List.of(
                   DataTypes.createStructField("OurBranchID", 
DataTypes.StringType, true),
                   DataTypes.createStructField("AccountID", 
DataTypes.StringType, true),
                   DataTypes.createStructField("AccountName", 
DataTypes.StringType, true)
           )));
   
           writeTarget(source, true);
       }
   
       protected static SparkSession createSparkSession() {
           final SparkConf sparkConfig = new SparkConf()
                   .loadFromSystemProperties(true)
                   .setAppName("test2");
   
           sparkConfig.set("spark.ui.enabled", "true");
   
           sparkConfig.set("spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.hudi.catalog.HoodieCatalog");
           sparkConfig.set("spark.sql.hive.convertMetastoreParquet", "false");
   
           sparkConfig.set("spark.driver.maxResultSize", "256m");
   
           sparkConfig.set("spark.memory.fraction", "0.6");
           sparkConfig.set("spark.memory.storageFraction", "0.2");
   
           sparkConfig.set("spark.rdd.compress", "true");
   
           sparkConfig.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
           sparkConfig.set("spark.kryo.registrator", 
"org.apache.spark.HoodieSparkKryoRegistrar");
           sparkConfig.set("spark.kryoserializer.buffer.max", "256m");
   
           sparkConfig.set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
   
           sparkConfig.set("spark.sql.parquet.datetimeRebaseModeInRead", 
"LEGACY");
   
           sparkConfig.setMaster("local[6]");
   
           final SparkContext sparkContext = new SparkContext(sparkConfig);
   
           sparkContext.setLogLevel("ERROR");
   
           return SparkSession.builder()
                   .config(sparkConfig)
                   .sparkContext(sparkContext)
                   .getOrCreate();
       }
   
       public static void main(String[] args) throws Exception {
           SparkSession sparkSession = createSparkSession();
   
           try {
               new Test3(sparkSession).fetchAll();
           } finally {
               sparkSession.close();
           }
   
           sparkSession = createSparkSession();
   
           try {
               new Test3(sparkSession).fetchIncremental();
           } finally {
               sparkSession.close();
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to