This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 328e259d4a7 [HUDI-9417] Add validation for handling write stats during
commit (#13307)
328e259d4a7 is described below
commit 328e259d4a7bc284acd9417fe8461e34efdf8a78
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jun 6 22:25:37 2025 +0530
[HUDI-9417] Add validation for handling write stats during commit (#13307)
---
.../hudi/callback/common/WriteStatusValidator.java | 44 +++++
.../apache/hudi/client/BaseHoodieWriteClient.java | 6 +-
.../java/org/apache/hudi/client/WriteStatus.java | 12 +-
.../hudi/client/TestBaseHoodieWriteClient.java | 3 +-
.../apache/hudi/config/TestHoodieWriteConfig.java | 1 +
.../metadata/TestHoodieMetadataWriteUtils.java | 3 +-
.../hudi/utils/HoodieWriterClientTestHarness.java | 1 -
.../apache/hudi/client/HoodieFlinkWriteClient.java | 4 +-
.../apache/hudi/client/HoodieJavaWriteClient.java | 7 +-
.../hudi/client/SparkRDDMetadataWriteClient.java | 30 ++-
.../apache/hudi/client/SparkRDDWriteClient.java | 112 ++++++++++-
...ieBackedTableMetadataWriterTableVersionSix.java | 3 +-
.../client/TestSparkRDDMetadataWriteClient.java | 2 +-
.../org/apache/hudi/client/TestWriteStatus.java | 3 +-
.../hudi/testutils/HoodieClientTestBase.java | 2 +-
.../hudi/common/config/HoodieMetadataConfig.java | 2 +-
.../apache/hudi/common/model/HoodieWriteStat.java | 1 +
.../hudi/metadata/HoodieTableMetadataUtil.java | 4 +-
.../hudi/common/model/TestHoodieWriteStat.java | 2 +-
.../main/java/org/apache/hudi/DataSourceUtils.java | 50 +++++
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 43 ++---
.../hudi/utilities/streamer/HoodieStreamer.java | 7 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 211 ++++++++++++++-------
23 files changed, 428 insertions(+), 125 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
new file mode 100644
index 00000000000..d1c7a121f28
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/callback/common/WriteStatusValidator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.util.Option;
+
+/**
+ * WriteStatus validator to assist caller to process errors if any. Caller can
dictate if to proceed with the commit or not by means of the return
+ * value of the method {@link WriteStatusValidator#validate}.
+ *
+ * <p>Sometimes callers invoke the dag just to process if there are any errors
before proceeding with the commit.
+ * This hook function is introduced for avoiding additional dag triggers from
the callers side.
+ */
+public interface WriteStatusValidator {
+
+ /**
+ * Validates the given write status.
+ *
+ * @param totalRecords Total records in this inflight commit.
+ * @param totalErroredRecords Total error records in this inflight commit.
+ * @param writeStatusesOpt List of {@link WriteStatus} for the data table
writes for this inflight commit.
+ *
+ * @return True if the commit can proceed
+ */
+ boolean validate(long totalRecords, long totalErroredRecords,
Option<HoodieData<WriteStatus>> writeStatusesOpt);
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 7a298e6ae45..e4751140599 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.callback.HoodieWriteCommitCallback;
import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.heartbeat.HeartbeatUtils;
@@ -221,12 +222,13 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
public boolean commit(String instantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds) {
return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds,
- Option.empty());
+ Option.empty(), Option.empty());
}
public abstract boolean commit(String instantTime, O writeStatuses,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc);
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator>
writeStatusValidatorOpt);
public boolean commitStats(String instantTime, List<HoodieWriteStat> stats,
Option<Map<String, String>> extraMetadata,
String commitActionType) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
index 3477398d7d8..7d355a67418 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/WriteStatus.java
@@ -189,12 +189,18 @@ public class WriteStatus implements Serializable {
totalErrorRecords++;
}
- public void removeMetadataStats() {
+ public WriteStatus removeMetadataIndexStatsAndErrorRecordsTracking() {
+ removeMetadataStats();
+ dropGranularErrorRecordsTracking();
+ return this;
+ }
+
+ public WriteStatus removeMetadataStats() {
this.writtenRecordDelegates.clear();
- this.stat.removeRecordStats();
+ return this;
}
- public void dropErrorRecords() {
+ public void dropGranularErrorRecordsTracking() {
failedRecords.clear();
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 04e3ab8fe26..a198afb8fc6 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
@@ -172,7 +173,7 @@ class TestBaseHoodieWriteClient extends
HoodieCommonTestHarness {
@Override
public boolean commit(String instantTime, String writeStatuses,
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc, Option<WriteStatusValidator>
writeStatusValidatorOpt) {
return false;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 7d2563c5f2a..c9cc16f1c04 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -732,6 +732,7 @@ public class TestHoodieWriteConfig {
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withProperties(props)
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withStreamingWriteEnabled(true).build())
.withEngineType(EngineType.SPARK).build();
assertTrue(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index ee1fd587e04..3b343d3dc12 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -73,10 +73,11 @@ public class TestHoodieMetadataWriteUtils {
@Test
public void testCreateMetadataWriteConfigForNBCC() {
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
- .withPath("/tmp/base_path/.hoodie/metadata/")
+ .withPath("/tmp/base_path/")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
.retainCommits(5).build())
+
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withStreamingWriteEnabled(true).build())
.build();
HoodieWriteConfig metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
HoodieFailedWritesCleaningPolicy.EAGER,
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index f250f31f333..dabdabdeb5e 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -786,7 +786,6 @@ public abstract class HoodieWriterClientTestHarness extends
HoodieCommonTestHarn
Function2<HoodieTable, HoodieTableMetaClient, HoodieWriteConfig>
getHoodieTableFn,
Function transformInputFn, Function transformOutputFn) throws Exception {
String instantTime = "00000000000010";
- HoodieTableMetaClient metaClient = createMetaClient();
HoodieWriteConfig cfg =
getRollbackMarkersAndConsistencyGuardWriteConfig(rollbackUsingMarkers,
enableOptimisticConsistencyGuard, populateMetaFields);
BaseHoodieWriteClient client = getHoodieWriteClient(cfg);
testConsistencyCheck(context, metaClient, instantTime,
enableOptimisticConsistencyGuard, getHoodieTableFn, transformInputFn,
transformOutputFn);
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 3d9958d0f4d..39e481dff10 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.utils.TransactionUtils;
import org.apache.hudi.common.data.HoodieListData;
@@ -101,7 +102,8 @@ public class HoodieFlinkWriteClient<T>
@Override
public boolean commit(String instantTime, List<WriteStatus> writeStatuses,
Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
List<HoodieWriteStat> writeStats =
writeStatuses.parallelStream().map(WriteStatus::getStat).collect(Collectors.toList());
// for eager flush, multiple write stat may share one file path.
List<HoodieWriteStat> merged = writeStats.stream()
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 03362e0f383..c717b854cc8 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -90,10 +91,10 @@ public class HoodieJavaWriteClient<T> extends
Option<Map<String, String>> extraMetadata,
String commitActionType,
Map<String, List<String>> partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
List<HoodieWriteStat> writeStats =
writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds,
- extraPreCommitFunc);
+ return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
index f3f897d3915..c13624fd867 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
@@ -18,13 +18,17 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
@@ -35,8 +39,12 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.spark.api.java.JavaRDD;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
/**
* Write client to assist with writing to metadata table.
@@ -45,6 +53,8 @@ import java.util.List;
*/
public class SparkRDDMetadataWriteClient<T> extends SparkRDDWriteClient<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkRDDMetadataWriteClient.class);
+
// tracks the instants for which upsertPrepped is invoked.
private Option<String> firstInstantOpt = Option.empty();
private int invocationCounts = 0;
@@ -63,10 +73,26 @@ public class SparkRDDMetadataWriteClient<T> extends
SparkRDDWriteClient<T> {
return TimelineUtils.generateInstantTime(false, timeGenerator);
}
+ @Override
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
+ // for metadata table, we don't have any write status validator, since we
use FailOnFirstErrorWriteStatus as the write status class.
+ ValidationUtils.checkArgument(!writeStatusValidatorOpt.isPresent(),
"Metadata table is not expected to contain write status validator");
+ // Triggering the dag for writes to metadata table.
+ // When streaming writes are enabled, writes to metadata may not call this
method as the caller tightly controls the dag de-referencing.
+ // Even then, to initialize a new partition in Metadata table and for
non-incremental operations like insert_overwrite, etc., writes to metadata table
+ // will invoke this commit method.
+ List<HoodieWriteStat> hoodieWriteStats = writeStatuses.map(writeStatus ->
writeStatus.getStat()).collect();
+ return commitStats(instantTime, hoodieWriteStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ }
+
/**
* Upserts the given prepared records into the Hoodie table, at the supplied
instantTime.
- * <p>
- * This implementation requires that the input records are already tagged,
and de-duped if needed.
+ *
+ * <p>This implementation requires that the input records are already
tagged, and de-duped if needed.
*
* @param preppedRecords Prepared HoodieRecords to upsert
* @param instantTime Instant time of the commit
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 2331596b0eb..23af3fd4441 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -18,6 +18,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.index.HoodieSparkIndexClient;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -57,9 +58,12 @@ import org.apache.spark.api.java.JavaSparkContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
@SuppressWarnings("checkstyle:LineLength")
public class SparkRDDWriteClient<T> extends
@@ -82,16 +86,58 @@ public class SparkRDDWriteClient<T> extends
return SparkHoodieIndexFactory.createIndex(config);
}
+ public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
+ String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return commit(instantTime, writeStatuses, extraMetadata, commitActionType,
partitionToReplacedFileIds, extraPreCommitFunc, Option.empty());
+ }
+
/**
* Complete changes performed at the given instantTime marker with specified
action.
*/
@Override
public boolean commit(String instantTime, JavaRDD<WriteStatus>
writeStatuses, Option<Map<String, String>> extraMetadata,
String commitActionType, Map<String, List<String>>
partitionToReplacedFileIds,
- Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc,
+ Option<WriteStatusValidator> writeStatusValidatorOpt) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing stats: "
+ config.getTableName());
- List<HoodieWriteStat> writeStats =
writeStatuses.map(WriteStatus::getStat).collect();
- return commitStats(instantTime, writeStats, extraMetadata,
commitActionType, partitionToReplacedFileIds, extraPreCommitFunc);
+ // Triggering the dag for writes.
+ //
+ // 1. If streaming writes are enabled, writes to both data table and
metadata table gets triggered at this juncture;
+ // 2. If not, writes to data table gets triggered here.
+ //
+ // When streaming writes are enabled, data table's WriteStatus is expected
to contain all stats required to generate metadata table records and so each
object will be larger.
+ // Here all additional stats and error records are dropped to retain only
the required information and prevent collecting large objects on the driver.
+ List<SlimWriteStats> slimWriteStatsList = writeStatuses
+ .map(writeStatus -> new SlimWriteStats(writeStatus.isMetadataTable(),
writeStatus.getTotalRecords(), writeStatus.getTotalErrorRecords(),
+ writeStatus.getStat())).collect();
+ // Compute stats for the data table writes and invoke callback
+ AtomicLong totalRecords = new AtomicLong(0);
+ AtomicLong totalErrorRecords = new AtomicLong(0);
+ // collect record stats for data table
+ slimWriteStatsList.stream().filter(slimWriteStats ->
!slimWriteStats.isMetadataTable())
+ .forEach(slimWriteStats -> {
+ totalRecords.getAndAdd(slimWriteStats.getTotalRecords());
+ totalErrorRecords.getAndAdd(slimWriteStats.getTotalErrorRecords());
+ });
+ // Why passing RDD<WriteStatus> to the WriteStatus validator:
+ // At the beginning of this method, we drop all index stats and error
records before collecting in the driver.
+ // Just in case if there are errors, caller might be interested to fetch
error records in the validator where
+ // a complete collection of RDD<WriteStatus> is required.
+ boolean canProceed = writeStatusValidatorOpt.map(callback ->
callback.validate(totalRecords.get(), totalErrorRecords.get(),
+ totalErrorRecords.get() > 0 ?
Option.of(HoodieJavaRDD.of(writeStatuses.filter(status ->
!status.isMetadataTable()).map(WriteStatus::removeMetadataStats))) :
Option.empty()))
+ .orElse(true);
+
+ // Proceeds only if validator returns true, otherwise bails out.
+ if (canProceed) {
+ // when streaming writes are enabled, writeStatuses is a mix of data
table write status and mdt write status
+ List<HoodieWriteStat> dataTableHoodieWriteStats =
slimWriteStatsList.stream().filter(entry ->
!entry.isMetadataTable()).map(SlimWriteStats::getWriteStat).collect(Collectors.toList());
+ return commitStats(instantTime, dataTableHoodieWriteStats,
extraMetadata, commitActionType,
+ partitionToReplacedFileIds, extraPreCommitFunc);
+ } else {
+ LOG.error("Exiting early due to errors with write operation ");
+ return false;
+ }
}
@Override
@@ -344,4 +390,64 @@ public class SparkRDDWriteClient<T> extends
super.releaseResources(instantTime);
SparkReleaseResources.releaseCachedData(context, config, basePath,
instantTime);
}
+
+ /**
+ * Slim WriteStatus to hold info like total records, total record records,
+ * HoodieWriteStat and whether the writeStatus is referring to metadata
table or not.
+ */
+ static class SlimWriteStats implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private boolean isMetadataTable;
+ private long totalRecords;
+ private long totalErrorRecords;
+ private HoodieWriteStat writeStat;
+
+ public SlimWriteStats(boolean isMetadataTable, long totalRecords, long
totalErrorRecords, HoodieWriteStat writeStat) {
+ this.isMetadataTable = isMetadataTable;
+ this.totalRecords = totalRecords;
+ this.totalErrorRecords = totalErrorRecords;
+ this.writeStat = writeStat;
+ }
+
+ public boolean isMetadataTable() {
+ return isMetadataTable;
+ }
+
+ public long getTotalRecords() {
+ return totalRecords;
+ }
+
+ public long getTotalErrorRecords() {
+ return totalErrorRecords;
+ }
+
+ public HoodieWriteStat getWriteStat() {
+ return writeStat;
+ }
+
+ // setter for efficient serialization,
+ // please do not remove it even if it is not used.
+ public void setMetadataTable(boolean metadataTable) {
+ isMetadataTable = metadataTable;
+ }
+
+ // setter for efficient serialization,
+ // please do not remove it even if it is not used.
+ public void setTotalRecords(long totalRecords) {
+ this.totalRecords = totalRecords;
+ }
+
+ // setter for efficient serialization,
+ // please do not remove it even if it is not used.
+ public void setTotalErrorRecords(long totalErrorRecords) {
+ this.totalErrorRecords = totalErrorRecords;
+ }
+
+ // setter for efficient serialization,
+ // please do not remove it even if it is not used.
+ public void setWriteStat(HoodieWriteStat writeStat) {
+ this.writeStat = writeStat;
+ }
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index e49ea638269..e61643de930 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -19,6 +19,7 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.SparkRDDMetadataWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -159,7 +160,7 @@ public class
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
@Override
public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?,
JavaRDD<WriteStatus>> initializeWriteClient() {
- return new SparkRDDWriteClient(engineContext, metadataWriteConfig,
Option.empty());
+ return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig,
Option.empty());
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index df742c51888..cf913ffff8d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -94,7 +94,7 @@ public class TestSparkRDDMetadataWriteClient extends
HoodieClientTestBase {
HoodieWriteConfig hoodieWriteConfig = getConfigBuilder()
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(false).withEnableRecordIndex(true)
- .withRecordIndexFileGroupCount(1, 1).build()).build();
+ .withRecordIndexFileGroupCount(1,
1).withStreamingWriteEnabled(true).build()).build();
// trigger end to end write to data table so that metadata table is also
initialized.
initDataTableWithACommit(hoodieWriteConfig);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
index 757620172f8..a41aa292be7 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestWriteStatus.java
@@ -220,7 +220,6 @@ public class TestWriteStatus {
// Remove metadata stats
status.removeMetadataStats();
assertEquals(0, status.getWrittenRecordDelegates().size());
- assertTrue(status.getStat().getColumnStats().isEmpty());
}
@Test
@@ -230,7 +229,7 @@ public class TestWriteStatus {
assertEquals(1, status.getFailedRecords().size());
// Drop error records
- status.dropErrorRecords();
+ status.dropGranularErrorRecordsTracking();
assertEquals(0, status.getFailedRecords().size());
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 395d340a9b2..2d8623552e3 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -530,7 +530,7 @@ public class HoodieClientTestBase extends
HoodieSparkClientTestHarness {
List<WriteStatus> statusList = writeFn.apply(client, writeRecords,
newCommitTime).collect();
JavaRDD<WriteStatus> result = jsc.parallelize(statusList, 1);
assertNoWriteErrors(statusList);
- // validate #isMetadataTable() in write status
+ // validate isMetadataTable() in write status
statusList.forEach(writeStatus ->
assertFalse(writeStatus.isMetadataTable()));
if (!skipCommit) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 5de0600ccd5..dfc1bb896b6 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -981,7 +981,7 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
private boolean getDefaultForStreamingWriteEnabled(EngineType engineType) {
switch (engineType) {
case SPARK:
- return true;
+ return false; // we will flip this to true in future patches.
case FLINK:
case JAVA:
return false;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 1f819a59041..820a255853b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -282,6 +282,7 @@ public class HoodieWriteStat extends HoodieReadStats {
}
// keep for serialization efficiency
+ // please do not remove it even if it is not used anywhere.
public void setRecordsStats(Map<String,
HoodieColumnRangeMetadata<Comparable>> stats) {
recordsStats = Option.of(stats);
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 82f25c76f85..f507a115979 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1609,8 +1609,8 @@ public class HoodieTableMetadataUtil {
private static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex) {
- if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat)
writeStat).getColumnStats().isPresent()) {
- Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
+ if (writeStat.getColumnStats().isPresent()) {
+ Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap =
writeStat.getColumnStats().get();
Collection<HoodieColumnRangeMetadata<Comparable>>
columnRangeMetadataList = columnRangeMap.values();
return
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(),
columnRangeMetadataList, false);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
index bb159ede971..17bbd73ddc3 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieWriteStat.java
@@ -148,7 +148,7 @@ public class TestHoodieWriteStat {
clonedInput.putAll(columnRangeMetadataMap);
HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setRecordsStats(clonedInput);
+ writeStat.putRecordsStats(clonedInput);
Map<String, HoodieColumnRangeMetadata<Comparable>> actualRecordStats =
writeStat.getColumnStats().get();
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
index d0d5619687c..5eec6df1130 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/DataSourceUtils.java
@@ -18,10 +18,13 @@
package org.apache.hudi;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDReadClient;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieEmptyRecord;
@@ -35,9 +38,11 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.TablePathUtils;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
@@ -57,6 +62,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -344,4 +350,48 @@ public class DataSourceUtils {
return handleDuplicates(
new HoodieSparkEngineContext(jssc), incomingHoodieRecords,
writeConfig, failOnDuplicates);
}
+
+ /**
+ * Spark data source WriteStatus validator.
+ *
+ * <ul>
+ * <li>If there are error records, prints few of them and exit;</li>
+ * <li>If not, proceeds with the commit.</li>
+ * </ul>
+ */
+ static class SparkDataSourceWriteStatusValidator implements
WriteStatusValidator {
+
+ private final WriteOperationType writeOperationType;
+ private final AtomicBoolean hasErrored;
+
+ public SparkDataSourceWriteStatusValidator(WriteOperationType
writeOperationType, AtomicBoolean hasErrored) {
+ this.writeOperationType = writeOperationType;
+ this.hasErrored = hasErrored;
+ }
+
+ @Override
+ public boolean validate(long totalRecords, long totalErroredRecords,
Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+ if (totalErroredRecords > 0) {
+ hasErrored.set(true);
+ ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD
<WriteStatus> expected to be present when there are errors");
+ LOG.error("{} failed with errors", writeOperationType);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Printing out the top 100 errors");
+
+
HoodieJavaRDD.getJavaRDD(writeStatusesOpt.get()).filter(WriteStatus::hasErrors)
+ .take(100)
+ .forEach(ws -> {
+ LOG.trace("Global error:", ws.getGlobalError());
+ if (!ws.getErrors().isEmpty()) {
+ ws.getErrors().forEach((k, v) -> LOG.trace("Error for key
{}: {}", k, v));
+ }
+ });
+ }
+ return false;
+ } else {
+ return true;
+ }
+ }
+ }
}
+
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index e6588053425..315105816b6 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -20,6 +20,7 @@ package org.apache.hudi
import
org.apache.hudi.AutoRecordKeyGenerationUtils.mayBeValidateParamsForAutoGenerationOfRecordKeys
import org.apache.hudi.AvroConversionUtils.{convertAvroSchemaToStructType,
convertStructTypeToAvroSchema, getAvroRecordNameAndNamespace}
import
org.apache.hudi.DataSourceOptionsHelper.fetchMissingWriteConfigsFromTableConfig
+import org.apache.hudi.DataSourceUtils.SparkDataSourceWriteStatusValidator
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption}
import org.apache.hudi.HoodieSparkSqlWriter.StreamingWriteParams
@@ -81,6 +82,7 @@ import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.types.StructType
import org.slf4j.LoggerFactory
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.BiConsumer
import scala.collection.JavaConverters._
@@ -983,19 +985,21 @@ class HoodieSparkSqlWriterInternal {
tableInstantInfo:
TableInstantInfo,
extraPreCommitFn:
Option[BiConsumer[HoodieTableMetaClient, HoodieCommitMetadata]]
): (Boolean,
HOption[java.lang.String], HOption[java.lang.String]) = {
- if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() ==
0) {
- log.info("Proceeding to commit the write.")
- // get extra metadata from props
- // 1. properties starting with commit metadata key prefix
- // 2. properties related to checkpoint in spark streaming
- val extraMetadataOpt =
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
- val commitSuccess =
- client.commit(tableInstantInfo.instantTime,
writeResult.getWriteStatuses,
- extraMetadataOpt,
- tableInstantInfo.commitActionType,
- writeResult.getPartitionToReplaceFileIds,
- common.util.Option.ofNullable(extraPreCommitFn.orNull))
-
+ val hasErrors = new AtomicBoolean(false)
+ log.info("Proceeding to commit the write.")
+ // get extra metadata from props
+ // 1. properties starting with commit metadata key prefix
+ // 2. properties related to checkpoint in spark streaming
+ val extraMetadataOpt =
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
+ val commitSuccess =
+ client.commit(tableInstantInfo.instantTime, writeResult.getWriteStatuses,
+ extraMetadataOpt,
+ tableInstantInfo.commitActionType,
+ writeResult.getPartitionToReplaceFileIds,
+ common.util.Option.ofNullable(extraPreCommitFn.orNull),
+ org.apache.hudi.common.util.Option.of(new
SparkDataSourceWriteStatusValidator(tableInstantInfo.operation, hasErrors)))
+
+ if (!hasErrors.get()) {
if (commitSuccess) {
log.info("Commit " + tableInstantInfo.instantTime + " successful!")
}
@@ -1029,19 +1033,6 @@ class HoodieSparkSqlWriterInternal {
log.info(s"Is Async Compaction Enabled ? $asyncCompactionEnabled")
(commitSuccess && metaSyncSuccess, compactionInstant, clusteringInstant)
} else {
- log.error(s"${tableInstantInfo.operation} failed with errors")
- if (log.isTraceEnabled) {
- log.trace("Printing out the top 100 errors")
- writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors)
- .take(100)
- .foreach(ws => {
- log.trace("Global error :", ws.getGlobalError)
- if (ws.getErrors.size() > 0) {
- ws.getErrors.asScala.foreach(kt =>
- log.trace(s"Error for key: ${kt._1}", kt._2))
- }
- })
- }
(false, common.util.Option.empty(), common.util.Option.empty())
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index c5fa5d7ffc8..05c93ba0e8b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -925,8 +925,11 @@ public class HoodieStreamer implements Serializable {
@Override
protected boolean requestShutdownIfNeeded(Option<HoodieData<WriteStatus>>
lastWriteStatuses) {
- Option<JavaRDD<WriteStatus>> lastWriteStatusRDD =
Option.ofNullable(lastWriteStatuses.isPresent() ?
HoodieJavaRDD.getJavaRDD(lastWriteStatuses.get()) : null);
- return postWriteTerminationStrategy.isPresent() &&
postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD);
+ if (postWriteTerminationStrategy.isPresent()) {
+ Option<JavaRDD<WriteStatus>> lastWriteStatusRDD =
lastWriteStatuses.map(HoodieJavaRDD::getJavaRDD);
+ return
postWriteTerminationStrategy.get().shouldShutdown(lastWriteStatusRDD);
+ }
+ return false;
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index c36510a0db7..9766076a94a 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -27,6 +27,7 @@ import org.apache.hudi.HoodieSchemaUtils;
import org.apache.hudi.HoodieSparkSqlWriter;
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.callback.common.WriteStatusValidator;
import org.apache.hudi.client.HoodieWriteResult;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
@@ -39,6 +40,7 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieTableType;
@@ -65,6 +67,7 @@ import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
+import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetaSyncException;
@@ -128,6 +131,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -799,93 +803,48 @@ public class StreamSync implements Serializable,
Closeable {
// write to hudi and fetch result
WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime, useRowWriter);
Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
- // write to error table
- JavaRDD<WriteStatus> dataTableWriteStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
- JavaRDD<WriteStatus> writeStatusRDD = dataTableWriteStatusRDD;
+ JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
String errorTableInstantTime = writeClient.createNewInstantTime();
Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt =
Option.empty();
if (errorTableWriter.isPresent() && isErrorTableWriteUnificationEnabled)
{
errorTableWriteStatusRDDOpt = errorTableWriter.map(w ->
w.upsert(errorTableInstantTime, instantTime, getLatestCommittedInstant()));
- writeStatusRDD = errorTableWriteStatusRDDOpt.map(errorTableWriteStatus
->
errorTableWriteStatus.union(dataTableWriteStatusRDD)).orElse(dataTableWriteStatusRDD);
}
- // process write status
- long totalErrorRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
- long totalRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
- long totalSuccessfulRecords = totalRecords - totalErrorRecords;
- LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
- instantTime, totalRecords, totalErrorRecords,
totalSuccessfulRecords);
- if (totalRecords == 0) {
- LOG.info("No new data, perform empty commit.");
- }
- boolean hasErrors = totalErrorRecords > 0;
- if (!hasErrors || cfg.commitOnErrors) {
- Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
- if (hasErrors) {
- LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
- + totalErrorRecords + "/" + totalRecords);
- }
- String commitActionType =
CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- if (errorTableWriter.isPresent()) {
- boolean errorTableSuccess = true;
- // Commit the error events triggered so far to the error table
- if (isErrorTableWriteUnificationEnabled &&
errorTableWriteStatusRDDOpt.isPresent()) {
- errorTableSuccess =
errorTableWriter.get().commit(errorTableInstantTime,
errorTableWriteStatusRDDOpt.get());
- } else if (!isErrorTableWriteUnificationEnabled) {
- errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime,
getLatestCommittedInstant());
- }
- if (!errorTableSuccess) {
- switch (errorWriteFailureStrategy) {
- case ROLLBACK_COMMIT:
- LOG.info("Commit " + instantTime + " failed!");
- writeClient.rollback(instantTime);
- throw new HoodieStreamerWriteException("Error table commit
failed");
- case LOG_ERROR:
- LOG.error("Error Table write failed for instant " +
instantTime);
- break;
- default:
- throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
- }
- }
+ Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
+ AtomicLong totalSuccessfulRecords = new AtomicLong(0);
+ Option<String> latestCommittedInstant = getLatestCommittedInstant();
+ WriteStatusValidator writeStatusValidator = new
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
+ cfg, errorTableWriter, errorTableWriteStatusRDDOpt,
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled,
errorTableInstantTime, writeClient, latestCommittedInstant,
+ totalSuccessfulRecords);
+ String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
+
+ boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
+ Option.of(writeStatusValidator));
+ releaseResourcesInvoked = true;
+ if (success) {
+ LOG.info("Commit " + instantTime + " successful!");
+
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch()
!= null
+ ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() :
null);
+ // Schedule compaction if needed
+ if (cfg.isAsyncCompactionEnabled()) {
+ scheduledCompactionInstant =
writeClient.scheduleCompaction(Option.empty());
}
- boolean success = writeClient.commit(instantTime,
dataTableWriteStatusRDD, Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty());
- releaseResourcesInvoked = true;
- if (success) {
- LOG.info("Commit " + instantTime + " successful!");
-
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch()
!= null
- ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() :
null);
- // Schedule compaction if needed
- if (cfg.isAsyncCompactionEnabled()) {
- scheduledCompactionInstant =
writeClient.scheduleCompaction(Option.empty());
- }
- if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
- runMetaSync();
- } else {
- LOG.info(String.format("Not running metaSync
totalSuccessfulRecords=%d", totalSuccessfulRecords));
- }
+ if ((totalSuccessfulRecords.get() > 0) || cfg.forceEmptyMetaSync) {
+ runMetaSync();
} else {
- LOG.info("Commit " + instantTime + " failed!");
- throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed!");
+ LOG.info(String.format("Not running metaSync
totalSuccessfulRecords=%d", totalSuccessfulRecords.get()));
}
} else {
- LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErrorRecords + "/" + totalRecords);
- LOG.error("Printing out the top 100 errors");
-
dataTableWriteStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
- LOG.error("Global error :", ws.getGlobalError());
- if (ws.getErrors().size() > 0) {
- ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:"
+ key + " is " + value));
- }
- });
- // Rolling back instant
- writeClient.rollback(instantTime);
- throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed and rolled-back !");
+ LOG.info("Commit " + instantTime + " failed!");
+ throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed!");
}
+
long overallTimeNanos = overallTimerContext != null ?
overallTimerContext.stop() : 0;
// Send DeltaStreamer Metrics
metrics.updateStreamerMetrics(overallTimeNanos);
- return Pair.of(scheduledCompactionInstant, dataTableWriteStatusRDD);
+ return Pair.of(scheduledCompactionInstant, writeStatusRDD);
} finally {
if (!releaseResourcesInvoked) {
releaseResources(instantTime);
@@ -1339,4 +1298,114 @@ public class StreamSync implements Serializable,
Closeable {
return writeStatusRDD;
}
}
+
+ /**
+ * WriteStatus Validator for commits to hoodie streamer data table.
+ * The writes to error table is taken care as well.
+ */
+ static class HoodieStreamerWriteStatusValidator implements
WriteStatusValidator {
+
+ private final boolean commitOnErrors;
+ private final String instantTime;
+ private final HoodieStreamer.Config cfg;
+ private final Option<BaseErrorTableWriter> errorTableWriter;
+ private final Option<JavaRDD<WriteStatus>> errorTableWriteStatusRDDOpt;
+ private final HoodieErrorTableConfig.ErrorWriteFailureStrategy
errorWriteFailureStrategy;
+ private final boolean isErrorTableWriteUnificationEnabled;
+ private final String errorTableInstantTime;
+ private final SparkRDDWriteClient writeClient;
+ private final Option<String> latestCommittedInstant;
+ private final AtomicLong totalSuccessfulRecords;
+
+ HoodieStreamerWriteStatusValidator(boolean commitOnErrors,
+ String instantTime,
+ HoodieStreamer.Config cfg,
+ Option<BaseErrorTableWriter>
errorTableWriter,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+
HoodieErrorTableConfig.ErrorWriteFailureStrategy errorWriteFailureStrategy,
+ boolean
isErrorTableWriteUnificationEnabled,
+ String errorTableInstantTime,
+ SparkRDDWriteClient writeClient,
+ Option<String> latestCommittedInstant,
+ AtomicLong totalSuccessfulRecords) {
+ this.commitOnErrors = commitOnErrors;
+ this.instantTime = instantTime;
+ this.cfg = cfg;
+ this.errorTableWriter = errorTableWriter;
+ this.errorTableWriteStatusRDDOpt = errorTableWriteStatusRDDOpt;
+ this.errorWriteFailureStrategy = errorWriteFailureStrategy;
+ this.isErrorTableWriteUnificationEnabled =
isErrorTableWriteUnificationEnabled;
+ this.errorTableInstantTime = errorTableInstantTime;
+ this.writeClient = writeClient;
+ this.latestCommittedInstant = latestCommittedInstant;
+ this.totalSuccessfulRecords = totalSuccessfulRecords;
+ }
+
+ @Override
+ public boolean validate(long tableTotalRecords, long
tableTotalErroredRecords, Option<HoodieData<WriteStatus>> writeStatusesOpt) {
+
+ long totalRecords = tableTotalRecords;
+ long totalErroredRecords = tableTotalErroredRecords;
+ if (isErrorTableWriteUnificationEnabled) {
+ totalRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalRecords).sum().longValue()).orElse(0L);
+ totalErroredRecords += errorTableWriteStatusRDDOpt.map(status ->
status.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue()).orElse(0L);
+ }
+ long totalSuccessfulRecords = totalRecords - totalErroredRecords;
+ this.totalSuccessfulRecords.set(totalSuccessfulRecords);
+ LOG.info("instantTime={}, totalRecords={}, totalErrorRecords={},
totalSuccessfulRecords={}",
+ instantTime, totalRecords, totalErroredRecords,
totalSuccessfulRecords);
+ if (totalRecords == 0) {
+ LOG.info("No new data, perform empty commit.");
+ }
+ boolean hasErrorRecords = totalErroredRecords > 0;
+ if (!hasErrorRecords || commitOnErrors) {
+ if (hasErrorRecords) {
+ LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
+ + totalErroredRecords + "/" + totalRecords);
+ }
+ }
+
+ if (errorTableWriter.isPresent()) {
+ boolean errorTableSuccess = true;
+ // Commit the error events triggered so far to the error table
+ if (isErrorTableWriteUnificationEnabled &&
errorTableWriteStatusRDDOpt.isPresent()) {
+ errorTableSuccess =
errorTableWriter.get().commit(errorTableInstantTime,
errorTableWriteStatusRDDOpt.get());
+ } else if (!isErrorTableWriteUnificationEnabled) {
+ errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, latestCommittedInstant);
+ }
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ LOG.info("Commit " + instantTime + " failed!");
+ writeClient.rollback(instantTime);
+ throw new HoodieStreamerWriteException("Error table commit
failed");
+ case LOG_ERROR:
+ LOG.error("Error Table write failed for instant " + instantTime);
+ break;
+ default:
+ throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
+ }
+ }
+ }
+ boolean canProceed = !hasErrorRecords || commitOnErrors;
+ if (canProceed) {
+ return canProceed;
+ } else {
+ LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErroredRecords + "/" + totalRecords);
+ LOG.error("Printing out the top 100 errors");
+ ValidationUtils.checkArgument(writeStatusesOpt.isPresent(), "RDD
<WriteStatus> is expected to be present when there are errors ");
+
HoodieJavaRDD.getJavaRDD(writeStatusesOpt.get()).filter(WriteStatus::hasErrors).take(100).forEach(writeStatus
-> {
+ LOG.error("Global error " + writeStatus.getGlobalError());
+ if (!writeStatus.getErrors().isEmpty()) {
+ writeStatus.getErrors().forEach((k,v) -> {
+ LOG.trace("Error for key %s : %s ", k, v);
+ });
+ }
+ });
+ // Rolling back instant
+ writeClient.rollback(instantTime);
+ throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed and rolled-back !");
+ }
+ }
+ }
}