lokeshj1703 commented on code in PR #13449:
URL: https://github.com/apache/hudi/pull/13449#discussion_r2157128788
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1134,26 +1138,48 @@ public void startCommit(String instantTime) {
@Override
public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
- List<MetadataPartitionType> partitionsToTag = new
ArrayList<>(enabledPartitionTypes);
- partitionsToTag.remove(FILES);
- partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
- if (partitionsToTag.isEmpty()) {
+ Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair
= getStreamingMetadataPartitionsToUpdate();
+ List<MetadataPartitionType> mdtPartitionsToTag =
streamingMDTPartitionsPair.getLeft();
+ Set<String> mdtPartitionPathsToTag = streamingMDTPartitionsPair.getRight();
+
+ if (mdtPartitionPathsToTag.isEmpty()) {
return engineContext.emptyHoodieData();
}
HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
- new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag,
dataWriteConfig));
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag,
dataWriteConfig));
// tag records w/ location
Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+ mdtPartitionPathsToTag);
// write partial writes to MDT table (for those partitions where streaming
writes are enabled)
HoodieData<WriteStatus> writeStatusCollection =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
// dag not yet de-referenced. do not invoke any action on
writeStatusCollection yet.
return writeStatusCollection;
}
+ private Pair<List<MetadataPartitionType>, Set<String>>
getStreamingMetadataPartitionsToUpdate() {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
Review Comment:
Addressed
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1134,26 +1138,48 @@ public void startCommit(String instantTime) {
@Override
public HoodieData<WriteStatus>
streamWriteToMetadataPartitions(HoodieData<WriteStatus> writeStatus, String
instantTime) {
- List<MetadataPartitionType> partitionsToTag = new
ArrayList<>(enabledPartitionTypes);
- partitionsToTag.remove(FILES);
- partitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
- if (partitionsToTag.isEmpty()) {
+ Pair<List<MetadataPartitionType>, Set<String>> streamingMDTPartitionsPair
= getStreamingMetadataPartitionsToUpdate();
+ List<MetadataPartitionType> mdtPartitionsToTag =
streamingMDTPartitionsPair.getLeft();
+ Set<String> mdtPartitionPathsToTag = streamingMDTPartitionsPair.getRight();
+
+ if (mdtPartitionPathsToTag.isEmpty()) {
return engineContext.emptyHoodieData();
}
HoodieData<HoodieRecord> untaggedRecords = writeStatus.flatMap(
- new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(partitionsToTag,
dataWriteConfig));
+ new
MetadataIndexGenerator.WriteStatusBasedMetadataIndexMapper(mdtPartitionsToTag,
dataWriteConfig));
// tag records w/ location
Pair<List<HoodieFileGroupId>, HoodieData<HoodieRecord>>
hoodieFileGroupsToUpdateAndTaggedMdtRecords =
tagRecordsWithLocationForStreamingWrites(untaggedRecords,
-
partitionsToTag.stream().map(MetadataPartitionType::getPartitionPath).collect(Collectors.toSet()));
+ mdtPartitionPathsToTag);
// write partial writes to MDT table (for those partitions where streaming
writes are enabled)
HoodieData<WriteStatus> writeStatusCollection =
convertEngineSpecificDataToHoodieData(streamWriteToMetadataTable(hoodieFileGroupsToUpdateAndTaggedMdtRecords,
instantTime));
// dag not yet de-referenced. do not invoke any action on
writeStatusCollection yet.
return writeStatusCollection;
}
+ private Pair<List<MetadataPartitionType>, Set<String>>
getStreamingMetadataPartitionsToUpdate() {
+ List<MetadataPartitionType> mdtPartitionsToTag = new
ArrayList<>(enabledPartitionTypes);
+ mdtPartitionsToTag.remove(FILES);
+ mdtPartitionsToTag.remove(SECONDARY_INDEX); // mdt partition path will
have additional suffixes which will be added below.
+ mdtPartitionsToTag.remove(EXPRESSION_INDEX); // mdt partition path will
have additional suffixes which will be added below.
+ mdtPartitionsToTag.retainAll(STREAMING_WRITES_SUPPORTED_PARTITIONS);
+
+ Set<String> mdtPartitionPathsToTag = new
HashSet<>(mdtPartitionsToTag.stream().map(mdtPartitionToTag ->
mdtPartitionToTag.getPartitionPath()).collect(Collectors.toSet()));
+ if
(STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES.contains(SECONDARY_INDEX)) {
Review Comment:
I have removed the variables `STREAMING_WRITES_SUPPORTED_PARTITION_PREFIXES`
itself. Now `getNonStreamingMetadataPartitionsToUpdate` calls
`getStreamingMetadataPartitionsToUpdate`.
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestAppendHandle.java:
##########
@@ -46,31 +55,48 @@
*/
public class TestAppendHandle extends BaseTestHandle {
- @ParameterizedTest
- @ValueSource(booleans = { true, false })
- public void testAppendHandleRLIStats(boolean populateMetaFields) {
+ @Test
+ public void testAppendHandleRLIAndSIStats() throws Exception {
// init config and table
HoodieWriteConfig config = getConfigBuilder(basePath)
.withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder().withRemoteServerPort(timelineServicePort).build())
- .withPopulateMetaFields(populateMetaFields)
-
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withEnableRecordIndex(true).withStreamingWriteEnabled(true).build())
+ .withMetadataConfig(HoodieMetadataConfig.newBuilder()
+ .enable(true)
+ .withEnableRecordIndex(true)
+ .withStreamingWriteEnabled(true)
+ .withSecondaryIndexEnabled(true)
+ .withSecondaryIndexName("sec-rider")
+ .withSecondaryIndexForColumn("rider")
+ .build())
.build();
-
+ config.setSchema(TRIP_EXAMPLE_SCHEMA);
HoodieTable table = HoodieSparkTable.create(config, context, metaClient);
// one round per partition
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[0];
// init some args
- String fileId = UUID.randomUUID().toString();
- String instantTime = "000";
+ String instantTime = InProcessTimeGenerator.createNewInstantTime();
- config.setSchema(TRIP_EXAMPLE_SCHEMA);
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new
String[] {partitionPath});
- // create parquet file
- createParquetFile(config, table, partitionPath, fileId, instantTime,
dataGenerator);
- // generate update records
+ SparkRDDWriteClient client = getHoodieWriteClient(config);
+ WriteClientTestUtils.startCommitWithTime(writeClient, instantTime);
+ List<HoodieRecord> records1 = dataGenerator.generateInserts(instantTime,
100);
+ JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records1, 1);
+ JavaRDD<WriteStatus> statuses = client.upsert(writeRecords, instantTime);
+ client.commit(instantTime, statuses, Option.empty(), COMMIT_ACTION,
Collections.emptyMap(), Option.empty());
+
+ metaClient = HoodieTableMetaClient.reload(metaClient);
+ table = (HoodieSparkCopyOnWriteTable)
HoodieSparkCopyOnWriteTable.create(config, context, metaClient);
+ HoodieFileGroup fileGroup =
table.getFileSystemView().getAllFileGroups(partitionPath).collect(Collectors.toList()).get(0);
+ String fileId = fileGroup.getFileGroupId().getFileId();
+
+ // generate update and delete records
instantTime = "001";
- List<HoodieRecord> records =
dataGenerator.generateUniqueUpdates(instantTime, 50);
+ int numUpdates = 20;
+ List<HoodieRecord> records =
dataGenerator.generateUniqueUpdates(instantTime, numUpdates);
+ int numDeletes = generateDeleteRecords(records, dataGenerator,
instantTime);
Review Comment:
There can be a very low probability that all the deletes overlap with
updates generated before. Generating a larger number of deletes to avoid that
case. Fixed it.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -81,18 +86,20 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends
HoodieIOHandle<T, I,
protected final TaskContextSupplier taskContextSupplier;
// For full schema evolution
protected final boolean schemaOnReadEnabled;
+ protected final boolean isStreamingWriteToMetadataEnabled;
+ List<Pair<String, HoodieIndexDefinition>> secondaryIndexDefns =
Collections.emptyList();
Review Comment:
Seems like it is not required right now. Its used within the package.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]