lokeshj1703 commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2157128788


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1134,26 +1138,48 @@ public void startCommit(String instantTime) {
 
   @Override
   public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
-    List<MetadataPartitionType> partitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
-    partitionsToTag.remove(FILES);
-    partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
-    if (partitionsToTag.isEmpty()) {
+    Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair 
= getStreamingMetadataPartitionsToUpdate();
+    List<MetadataPartitionType> mdtPartitionsToTag = 
streamingMDTPartitionsPair.getLeft();
+    Set<String> mdtPartitionPathsToTag = streamingMDTPartitionsPair.getRight();
+
+    if (mdtPartitionPathsToTag.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
 
     HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
-        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag, 
dataWriteConfig));
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag, 
dataWriteConfig));
 
     // tag records w/ location
     Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-        
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+        mdtPartitionPathsToTag);
 
     // write partial writes to MDT table (for those partitions where streaming 
writes are enabled)
     HoodieData<WriteStatus> writeStatusCollection = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
     // dag not yet de-referenced. do not invoke any action on 
writeStatusCollection yet.
     return writeStatusCollection;
   }
 
+  private Pair<List<MetadataPartitionType>, Set<String>> 
getStreamingMetadataPartitionsToUpdate() {
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);

Review Comment:
   Addressed



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1134,26 +1138,48 @@ public void startCommit(String instantTime) {
 
   @Override
   public HoodieData<WriteStatus> 
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String 
instantTime) {
-    List<MetadataPartitionType> partitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
-    partitionsToTag.remove(FILES);
-    partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
-    if (partitionsToTag.isEmpty()) {
+    Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair 
= getStreamingMetadataPartitionsToUpdate();
+    List<MetadataPartitionType> mdtPartitionsToTag = 
streamingMDTPartitionsPair.getLeft();
+    Set<String> mdtPartitionPathsToTag = streamingMDTPartitionsPair.getRight();
+
+    if (mdtPartitionPathsToTag.isEmpty()) {
       return engineContext.emptyHoodieData();
     }
 
     HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
-        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag, 
dataWriteConfig));
+        new 
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag, 
dataWriteConfig));
 
     // tag records w/ location
     Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>> 
hoodieFileGroupsToUpdateAndTaggedMdtRecords = 
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-        
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+        mdtPartitionPathsToTag);
 
     // write partial writes to MDT table (for those partitions where streaming 
writes are enabled)
     HoodieData<WriteStatus> writeStatusCollection = 
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
 instantTime));
     // dag not yet de-referenced. do not invoke any action on 
writeStatusCollection yet.
     return writeStatusCollection;
   }
 
+  private Pair<List<MetadataPartitionType>, Set<String>> 
getStreamingMetadataPartitionsToUpdate() {
+    List<MetadataPartitionType> mdtPartitionsToTag = new 
ArrayList<>(enabledPartitionTypes);
+    mdtPartitionsToTag.remove(FILES);
+    mdtPartitionsToTag.remove(SECONDARY_INDEX); // mdt partition path will 
have additional suffixes which will be added below.
+    mdtPartitionsToTag.remove(EXPRESSION_INDEX); // mdt partition path will 
have additional suffixes which will be added below.
+    mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+
+    Set<String> mdtPartitionPathsToTag = new 
HashSet<>(mdtPartitionsToTag.stream().map(mdtPartitionToTag -> 
mdtPartitionToTag.getPartitionPath()).collect(Collectors.toSet()));
+    if 
(STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES.contains(SECONDARY_INDEX)) {

Review Comment:
   I have removed the variables `STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES` 
itself. Now `getNonStreamingMetadataPartitionsToUpdate` calls 
`getStreamingMetadataPartitionsToUpdate`.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestAppendHandle.java:
##########
@@ -46,31 +55,48 @@
  */
 public class TestAppendHandle extends BaseTestHandle {
 
-  @ParameterizedTest
-  @ValueSource(booleans = { true, false })
-  public void testAppendHandleRLIStats(boolean populateMetaFields) {
+  @Test
+  public void testAppendHandleRLIAndSIStats() throws Exception {
     // init config and table
     HoodieWriteConfig config = getConfigBuilder(basePath)
         
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
-        .withPopulateMetaFields(populateMetaFields)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
         .build();
-
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
     HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
 
     // one round per partition
     String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
     // init some args
-    String fileId = UUID.randomUUID().toString();
-    String instantTime = "000";
+    String instantTime = InProcessTimeGenerator.createNewInstantTime();
 
-    config.setSchema(TRIP_EXAMPLE_SCHEMA);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
-    // create parquet file
-    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
-    // generate update records
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    WriteClientTestUtils.startCommitWithTime(writeClient, instantTime);
+    List<HoodieRecord> records1 = dataGenerator.generateInserts(instantTime, 
100);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records1, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
+    client.commit(instantTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+    HoodieFileGroup fileGroup = 
table.getFileSystemView().getAllFileGroups(partitionPath).collect(Collectors.toList()).get(0);
+    String fileId = fileGroup.getFileGroupId().getFileId();
+
+    // generate update and delete records
     instantTime = "001";
-    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    int numUpdates = 20;
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, numUpdates);
+    int numDeletes = generateDeleteRecords(records, dataGenerator, 
instantTime);

Review Comment:
   There can be a very low probability that all the deletes overlap with 
updates generated before. Generating a larger number of deletes to avoid that 
case. Fixed it.
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,18 +86,20 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   protected final TaskContextSupplier taskContextSupplier;
   // For full schema evolution
   protected final boolean schemaOnReadEnabled;
+  protected final boolean isStreamingWriteToMetadataEnabled;
+  List<Pair<String, HoodieIndexDefinition>> secondaryIndexDefns = 
Collections.emptyList();

Review Comment:
   Seems like it is not required right now. Its used within the package. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to