the-other-tim-brown commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2155165816


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/SecondaryIndexStats.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+/**
+ * Class is used to hold secondary index metadata updates. These updates are 
generated from
+ * various write handles during write.
+ */
+public class SecondaryIndexStats {

Review Comment:
   This is used in `IndexStats` which implements `Serializable`, should this 
also implement `Serializable`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,18 +86,20 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
   protected final TaskContextSupplier taskContextSupplier;
   // For full schema evolution
   protected final boolean schemaOnReadEnabled;
+  protected final boolean isStreamingWriteToMetadataEnabled;
+  List<Pair<String, HoodieIndexDefinition>> secondaryIndexDefns = 
Collections.emptyList();

Review Comment:
   make this protected?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/IndexStats.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.model.HoodieRecordDelegate;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to hold all index stats required to generate Metadata records for all 
enabled partitions.
+ * Supported stats are record level index stats and secondary index stats.
+ */
+public class IndexStats implements Serializable {

Review Comment:
   Will this value be serialized between drivers and executors? If so, should 
we add the setters or implement KryoSerializable for efficient serialization?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -226,6 +231,11 @@ private Option<FileSlice> 
populateWriteStatAndFetchFileSlice(HoodieRecord record
     return fileSlice;
   }
 
+  private Option<FileSlice> getFileSlice() {
+    TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+    return rtView.getLatestFileSlice(partitionPath, fileId);

Review Comment:
   If there is an inflight compaction, this call will filter out the base file 
giving us an incomplete view of the data for this file group. I think you'll 
want to use one of the other APIs for fetching the completed base file with the 
log files.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestAppendHandle.java:
##########
@@ -46,31 +55,48 @@
  */
 public class TestAppendHandle extends BaseTestHandle {
 
-  @ParameterizedTest
-  @ValueSource(booleans = { true, false })
-  public void testAppendHandleRLIStats(boolean populateMetaFields) {
+  @Test
+  public void testAppendHandleRLIAndSIStats() throws Exception {
     // init config and table
     HoodieWriteConfig config = getConfigBuilder(basePath)
         
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
-        .withPopulateMetaFields(populateMetaFields)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+            .enable(true)
+            .withEnableRecordIndex(true)
+            .withStreamingWriteEnabled(true)
+            .withSecondaryIndexEnabled(true)
+            .withSecondaryIndexName("sec-rider")
+            .withSecondaryIndexForColumn("rider")
+            .build())
         .build();
-
+    config.setSchema(TRIP_EXAMPLE_SCHEMA);
     HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
 
     // one round per partition
     String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
     // init some args
-    String fileId = UUID.randomUUID().toString();
-    String instantTime = "000";
+    String instantTime = InProcessTimeGenerator.createNewInstantTime();
 
-    config.setSchema(TRIP_EXAMPLE_SCHEMA);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[] {partitionPath});
-    // create parquet file
-    createParquetFile(config, table, partitionPath, fileId, instantTime, 
dataGenerator);
-    // generate update records
+    SparkRDDWriteClient client = getHoodieWriteClient(config);
+    WriteClientTestUtils.startCommitWithTime(writeClient, instantTime);
+    List<HoodieRecord> records1 = dataGenerator.generateInserts(instantTime, 
100);
+    JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records1, 1);
+    JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
+    client.commit(instantTime, statuses, Option.empty(), COMMIT_ACTION, 
Collections.emptyMap(), Option.empty());
+
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+    table = (HoodieSparkCopyOnWriteTable) 
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+    HoodieFileGroup fileGroup = 
table.getFileSystemView().getAllFileGroups(partitionPath).collect(Collectors.toList()).get(0);
+    String fileId = fileGroup.getFileGroupId().getFileId();
+
+    // generate update and delete records
     instantTime = "001";
-    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, 50);
+    int numUpdates = 20;
+    List<HoodieRecord> records = 
dataGenerator.generateUniqueUpdates(instantTime, numUpdates);
+    int numDeletes = generateDeleteRecords(records, dataGenerator, 
instantTime);

Review Comment:
   Should we assert that this is non-zero?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to