This is an automated email from the ASF dual-hosted git repository.
zhangyue19921010 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 77cfb3a51da [HUDI-7147] Fix CDC write flush bug (#10186)
77cfb3a51da is described below
commit 77cfb3a51dabffdbfc67ba0ac85cfa129277c166
Author: YueZhang <[email protected]>
AuthorDate: Wed Nov 29 09:46:53 2023 +0800
[HUDI-7147] Fix CDC write flush bug (#10186)
* Using iterator instead of values to avoid unsupported operation exception
* check style
---
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 23 +++++++++++-----------
1 file changed, 12 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index cab978164d8..1e2fa7c59e4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -53,10 +53,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
import static
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE;
import static
org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.DATA_BEFORE_AFTER;
@@ -84,7 +84,7 @@ public class HoodieCDCLogger implements Closeable {
private final Schema cdcSchema;
// the cdc data
- private final Map<String, HoodieAvroPayload> cdcData;
+ private final ExternalSpillableMap<String, HoodieAvroPayload> cdcData;
private final Map<HoodieLogBlock.HeaderMetadataType, String>
cdcDataBlockHeader;
@@ -183,15 +183,16 @@ public class HoodieCDCLogger implements Closeable {
private void flushIfNeeded(Boolean force) {
if (force || numOfCDCRecordsInMemory.get() * averageCDCRecordSize >=
maxBlockSize) {
try {
- List<HoodieRecord> records = cdcData.values().stream()
- .map(record -> {
- try {
- return new
HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get());
- } catch (IOException e) {
- throw new HoodieIOException("Failed to get cdc record", e);
- }
- }).collect(Collectors.toList());
-
+ ArrayList<HoodieRecord> records = new ArrayList<>();
+ Iterator<HoodieAvroPayload> recordIter = cdcData.iterator();
+ while (recordIter.hasNext()) {
+ HoodieAvroPayload record = recordIter.next();
+ try {
+ records.add(new
HoodieAvroIndexedRecord(record.getInsertValue(cdcSchema).get()));
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to get cdc record", e);
+ }
+ }
HoodieLogBlock block = new HoodieCDCDataBlock(records,
cdcDataBlockHeader, keyField);
AppendResult result =
cdcWriter.appendBlocks(Collections.singletonList(block));