lokeshj1703 commented on code in PR #13402:
URL: https://github.com/apache/hudi/pull/13402#discussion_r2139903384
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -316,33 +317,38 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ HoodieWriteMetadata<T> processedWriteMetadata =
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
return compactionWriteMetadata;
}
+ protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+ return writeMetadata;
+ }
+
public void commitCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
// dereferencing the write dag for compaction for the first time.
- List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+ Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
dataTableAndMetadataTableHoodieWriteStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
// Fetch commit metadata from HoodieWriteMetadata and update
HoodieWriteStat
-
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
writeStats);
+
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
dataTableAndMetadataTableHoodieWriteStats.getKey());
metrics.emitCompactionCompleted();
HoodieTable table = tableOpt.orElseGet(() -> createTable(config,
context.getStorageConf()));
- completeCompaction(compactionWriteMetadata.getCommitMetadata().get(),
table, compactionInstantTime);
+ completeCompaction(compactionWriteMetadata.getCommitMetadata().get(),
table, compactionInstantTime,
dataTableAndMetadataTableHoodieWriteStats.getValue());
}
/**
* The API triggers the data write and fetches the corresponding write stats
using the write metadata.
Review Comment:
NIT: javadoc needs to be udpated
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -1167,6 +1165,10 @@ protected O
streamWriteToMetadataTable(Pair<List<HoodieFileGroupId>, HoodieData<
throw new UnsupportedOperationException("Not yet implemented");
}
+ public O batchWriteToMetadataTablePartitions(I preppedRecords, String
instantTime) {
Review Comment:
missing javadoc
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -123,23 +133,35 @@ public boolean commit(String instantTime,
JavaRDD<WriteStatus> writeStatuses, Op
// At the beginning of this method, we drop all index stats and error
records before collecting in the driver.
// Just in case if there are errors, caller might be interested to fetch
error records in the validator where
// a complete collection of RDD<WriteStatus> is required.
+ JavaRDD<WriteStatus> finalWriteStatuses = writeStatuses;
boolean canProceed = writeStatusValidatorOpt.map(callback ->
callback.validate(totalRecords.get(), totalErrorRecords.get(),
- totalErrorRecords.get() > 0 ?
Option.of(HoodieJavaRDD.of(writeStatuses.filter(status ->
!status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) :
Option.empty()))
+ totalErrorRecords.get() > 0 ?
Option.of(HoodieJavaRDD.of(finalWriteStatuses.filter(status ->
!status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) :
Option.empty()))
.orElse(true);
// Proceeds only if validator returns true, otherwise bails out.
if (canProceed) {
// when streaming writes are enabled, writeStatuses is a mix of data
table write status and mdt write status
List<HoodieWriteStat> dataTableHoodieWriteStats =
slimWriteStatsList.stream().filter(entry ->
!entry.isMetadataTable()).map(SlimWriteStats::getWriteStat).collect(Collectors.toList());
List<HoodieWriteStat> partialMetadataHoodieWriteStatsSoFar =
slimWriteStatsList.stream().filter(entry ->
entry.isMetadataTable).map(slimWriteStats ->
slimWriteStats.getWriteStat()).collect(Collectors.toList());
Review Comment:
Should we call this streamMetadataWriteStats?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -371,14 +377,19 @@ protected void completeCompaction(HoodieCommitMetadata
metadata, HoodieTable tab
LOG.info("Compacted successfully on commit {}", compactionCommitTime);
}
+ protected void writeToMetadataTable(HoodieTable table, String instantTime,
HoodieCommitMetadata metadata, List<HoodieWriteStat> metadataWriteStatsSoFar) {
+ // legacy write DAG to metadata table.
+ writeTableMetadata(table, instantTime, metadata);
+ }
+
public void commitLogCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> writeMetadata, Option<HoodieTable> tableOpt) {
// dereferencing the write dag for log compaction for the first time.
- List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(writeMetadata);
+ Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
dataTableAndMetadataTableHoodieWriteStats =
triggerWritesAndFetchWriteStats(writeMetadata);
Review Comment:
We can probably rename to dataAndMetadataWriteStats. Since for metadata
table as well, the data stats are applicable.
##########
hudi-common/src/main/java/org/apache/hudi/common/model/WriteOperationType.java:
##########
@@ -181,4 +187,11 @@ public static boolean
isPreppedWriteOperation(WriteOperationType operationType)
public static boolean isCompactionOrClustering(WriteOperationType
operationType) {
return operationType == COMPACT || operationType == CLUSTER;
}
+
+ /**
+ * @return true if streaming writes to metadata table is supported for a
given {@link WriteOperationType}. false otherwise.
+ */
+ public static boolean streamingWritesToMetadataSupported(WriteOperationType
writeOperationType) {
Review Comment:
Currently hoodie.metadata.streaming.write.enabled is false, so CI is not
getting tested with streaming enabled.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -316,33 +317,38 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ HoodieWriteMetadata<T> processedWriteMetadata =
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
return compactionWriteMetadata;
}
+ protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
Review Comment:
NIT: javadoc
##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestWriteClient.java:
##########
@@ -59,9 +59,8 @@ public void
testInertsWithEmptyCommitsHavingWriterSchemaAsNull() throws Exceptio
try {
String firstCommit = "001";
int numRecords = 200;
- JavaRDD<WriteStatus> result = insertFirstBatch(cfgBuilder.build(),
client, firstCommit, "000", numRecords, SparkRDDWriteClient::insert,
+ insertFirstBatch(cfgBuilder.build(), client, firstCommit, "000",
numRecords, SparkRDDWriteClient::insert,
false, false, numRecords, INSTANT_GENERATOR);
- assertTrue(client.commit(firstCommit, result), "Commit should succeed");
Review Comment:
Unintentional change?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+ // Cached HoodieTableMetadataWriter for each action in data table. This will
be cleaned up when action is completed or when write client is closed.
+ protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap =
new ConcurrentHashMap<>();
+
+ /**
+ * Called by data table write client and data table table service client to
perform streaming write to metadata table.
+ * @param table {@link HoodieTable} instance for data table of interest.
+ * @param dataTableWriteStatuses {@link WriteStatus} from data table writes.
+ * @param instantTime instant time of interest.
+ * @return {@link HoodieData} of {@link WriteStatus} referring to both data
table writes and partial metadata table writes.
+ */
+ public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table,
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) {
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table);
+ if (metadataWriterOpt.isPresent()) {
+ return streamWriteToMetadataTable(dataTableWriteStatuses,
metadataWriterOpt, table, instantTime);
+ } else {
+ throw new HoodieMetadataException("Cannot instantiate metadata writer
for the table of interest " + table.getMetaClient().getBasePath());
+ }
+ }
+
+ // to be invoked by write client or table service client.
+
+ /**
+ * To be invoked by write client or table service client to write to
metadata table.
+ * When streaming writes are enabled, writes to left over metadata
partitions which was not covered in {@link
#streamWriteToMetadataTable(HoodieTable, HoodieData, String)}
+ * will be invoked here.
+ * If not, writes take the legacy way of writing to metadata table.
+ * @param table {@link HoodieTable} instance for data table of interest.
+ * @param instantTime instant time of interest.
+ * @param metadata {@link HoodieCommitMetadata} of interest.
+ * @param metadataWriteStatsSoFar List of {@link HoodieWriteStat}s referring
to partial writes completed in metadata table with streaming writes.
+ */
+ public void writeToMetadataTable(HoodieTable table, String instantTime,
+ HoodieCommitMetadata metadata,
List<HoodieWriteStat> metadataWriteStatsSoFar) {
+ writeToMetadataTable(table, instantTime, metadataWriteStatsSoFar,
metadata);
+ }
Review Comment:
The caller and callee function can be merged here
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -316,33 +317,38 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ HoodieWriteMetadata<T> processedWriteMetadata =
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
return compactionWriteMetadata;
}
+ protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
Review Comment:
Should we name this something like streamWriteToMetadataTable?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -316,33 +317,38 @@ protected HoodieWriteMetadata<O> compact(HoodieTable<?,
I, ?, T> table, String c
}
compactionTimer = metrics.getCompactionCtx();
HoodieWriteMetadata<T> writeMetadata = table.compact(context,
compactionInstantTime);
- HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(writeMetadata);
+ HoodieWriteMetadata<T> processedWriteMetadata =
processWriteMetadata(table, writeMetadata, compactionInstantTime);
+ HoodieWriteMetadata<O> compactionWriteMetadata =
convertToOutputMetadata(processedWriteMetadata);
if (shouldComplete) {
commitCompaction(compactionInstantTime, compactionWriteMetadata,
Option.of(table));
}
return compactionWriteMetadata;
}
+ protected HoodieWriteMetadata<T> processWriteMetadata(HoodieTable table,
HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+ return writeMetadata;
+ }
+
public void commitCompaction(String compactionInstantTime,
HoodieWriteMetadata<O> compactionWriteMetadata, Option<HoodieTable> tableOpt) {
// dereferencing the write dag for compaction for the first time.
- List<HoodieWriteStat> writeStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
+ Pair<List<HoodieWriteStat>, List<HoodieWriteStat>>
dataTableAndMetadataTableHoodieWriteStats =
triggerWritesAndFetchWriteStats(compactionWriteMetadata);
// Fetch commit metadata from HoodieWriteMetadata and update
HoodieWriteStat
-
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
writeStats);
+
CommonClientUtils.stitchCompactionHoodieWriteStats(compactionWriteMetadata,
dataTableAndMetadataTableHoodieWriteStats.getKey());
Review Comment:
It will probably be more easier to understand if we split this to two
variable tracking data and metadata write stats
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieMetadataWriteWrapper.java:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Abstraction for data table write client and table service client to write
to metadata table.
+ */
+public class HoodieMetadataWriteWrapper {
+
+ // Cached HoodieTableMetadataWriter for each action in data table. This will
be cleaned up when action is completed or when write client is closed.
+ protected Map<String, Option<HoodieTableMetadataWriter>> metadataWriterMap =
new ConcurrentHashMap<>();
+
+ /**
+ * Called by data table write client and data table table service client to
perform streaming write to metadata table.
+ * @param table {@link HoodieTable} instance for data table of interest.
+ * @param dataTableWriteStatuses {@link WriteStatus} from data table writes.
+ * @param instantTime instant time of interest.
+ * @return {@link HoodieData} of {@link WriteStatus} referring to both data
table writes and partial metadata table writes.
+ */
+ public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table,
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) {
+ Option<HoodieTableMetadataWriter> metadataWriterOpt =
getMetadataWriter(instantTime, table);
+ if (metadataWriterOpt.isPresent()) {
+ return streamWriteToMetadataTable(dataTableWriteStatuses,
metadataWriterOpt, table, instantTime);
+ } else {
+ throw new HoodieMetadataException("Cannot instantiate metadata writer
for the table of interest " + table.getMetaClient().getBasePath());
+ }
+ }
+
+ // to be invoked by write client or table service client.
+
+ /**
+ * To be invoked by write client or table service client to write to
metadata table.
+ * When streaming writes are enabled, writes to left over metadata
partitions which was not covered in {@link
#streamWriteToMetadataTable(HoodieTable, HoodieData, String)}
+ * will be invoked here.
+ * If not, writes take the legacy way of writing to metadata table.
+ * @param table {@link HoodieTable} instance for data table of interest.
+ * @param instantTime instant time of interest.
+ * @param metadata {@link HoodieCommitMetadata} of interest.
+ * @param metadataWriteStatsSoFar List of {@link HoodieWriteStat}s referring
to partial writes completed in metadata table with streaming writes.
+ */
+ public void writeToMetadataTable(HoodieTable table, String instantTime,
+ HoodieCommitMetadata metadata,
List<HoodieWriteStat> metadataWriteStatsSoFar) {
+ writeToMetadataTable(table, instantTime, metadataWriteStatsSoFar,
metadata);
+ }
Review Comment:
Should we name this function sth like `batchWriteToMetadataTable`? Multiple
write APIs can confuse the reader.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]