This is an automated email from the ASF dual-hosted git repository. yunqing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push: new ca24d2ede [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323) ca24d2ede is described below commit ca24d2edea046611f8c77d70b1465b550f146aaa Author: yunqingmoswu <44659300+yunqingmo...@users.noreply.github.com> AuthorDate: Mon Oct 31 17:19:57 2022 +0800 [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario (#6323) * [INLONG-6322][Sort] Fix write data incorrect for doris connector with sink multiple scenario * [INLONG-6322][Sort] Fix flushing update error * [INLONG-6322][Sort] Optimize flush handle * [INLONG-6322][Sort] Fix comment error --- .../table/DorisDynamicSchemaOutputFormat.java | 68 +++++++++++++++++----- 1 file changed, 53 insertions(+), 15 deletions(-) diff --git a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java index c2db85789..9e8142ae7 100644 --- a/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java +++ b/inlong-sort/sort-connectors/doris/src/main/java/org/apache/inlong/sort/doris/table/DorisDynamicSchemaOutputFormat.java @@ -47,6 +47,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; /** * DorisDynamicSchemaOutputFormat, copy from {@link org.apache.doris.flink.table.DorisDynamicOutputFormat} @@ -76,11 +77,16 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { private final String tablePattern; private final String dynamicSchemaFormat; private final boolean ignoreSingleTableErrors; - private final transient Map<String, Exception> flushExceptionMap = new HashMap<>(); + private final Map<String, Exception> flushExceptionMap = new HashMap<>(); + private final AtomicLong readInNum = new AtomicLong(0); + private final AtomicLong writeOutNum = new AtomicLong(0); + private final AtomicLong errorNum = new AtomicLong(0); + private final AtomicLong ddlNum = new AtomicLong(0); private long batchBytes = 0L; private int size; private DorisStreamLoad dorisStreamLoad; private transient volatile boolean closed = false; + private transient volatile boolean flushing = false; private transient ScheduledExecutorService scheduler; private transient ScheduledFuture<?> scheduledFuture; private transient JsonDynamicSchemaFormat jsonDynamicSchemaFormat; @@ -131,10 +137,8 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory("doris-streamload-output-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { - synchronized (DorisDynamicSchemaOutputFormat.this) { - if (!closed) { - flush(); - } + if (!closed && !flushing) { + flush(); } }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS); } @@ -158,7 +162,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { addBatch(row); boolean valid = (executionOptions.getBatchSize() > 0 && size >= executionOptions.getBatchSize()) || batchBytes >= executionOptions.getMaxBatchBytes(); - if (valid) { + if (valid && !flushing) { flush(); } } @@ -168,8 +172,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { if (row instanceof RowData) { RowData rowData = (RowData) row; JsonNode rootNode = jsonDynamicSchemaFormat.deserialize(rowData.getBinary(0)); + readInNum.incrementAndGet(); boolean isDDL = jsonDynamicSchemaFormat.extractDDLFlag(rootNode); if (isDDL) { + ddlNum.incrementAndGet(); // Ignore ddl change for now return; } @@ -226,6 +232,7 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { } break; default: + errorNum.incrementAndGet(); throw new RuntimeException("Unrecognized row kind:" + rowKind.toString()); } } @@ -239,12 +246,10 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { public synchronized void close() throws IOException { if (!closed) { closed = true; - if (this.scheduledFuture != null) { scheduledFuture.cancel(false); this.scheduler.shutdown(); } - try { flush(); } catch (Exception e) { @@ -258,27 +263,61 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { @SuppressWarnings({"rawtypes"}) public synchronized void flush() { - if (batchMap.isEmpty()) { + flushing = true; + if (!hasRecords()) { + flushing = false; return; } + List<String> errorTables = new ArrayList<>(); for (Entry<String, List> kvs : batchMap.entrySet()) { - if (checkFlushException(kvs.getKey())) { + if (checkFlushException(kvs.getKey()) || kvs.getValue().isEmpty()) { continue; } + String loadValue = null; try { - load(kvs.getKey(), OBJECT_MAPPER.writeValueAsString(kvs.getValue())); + loadValue = OBJECT_MAPPER.writeValueAsString(kvs.getValue()); + load(kvs.getKey(), loadValue); + LOG.info("load {} records to tableIdentifier: {}", kvs.getValue().size(), kvs.getKey()); + writeOutNum.addAndGet(kvs.getValue().size()); + // Clean the data that has been loaded. + kvs.getValue().clear(); } catch (Exception e) { flushExceptionMap.put(kvs.getKey(), e); + errorNum.getAndAdd(kvs.getValue().size()); if (!ignoreSingleTableErrors) { throw new RuntimeException( - String.format("Writing records to streamload of tableIdentifier:%s failed.", kvs.getKey()), - e); + String.format("Writing records to streamload of tableIdentifier:%s failed, the value: %s.", + kvs.getKey(), loadValue), e); } - batchMap.remove(kvs.getKey()); + errorTables.add(kvs.getKey()); + LOG.warn("The tableIdentifier: {} load failed and the data will be throw away in the future" + + " because the option 'sink.multiple.ignore-single-table-errors' is 'true'", kvs.getKey()); } } + if (!errorTables.isEmpty()) { + // Clean the key that has errors + errorTables.forEach(batchMap::remove); + } batchBytes = 0; size = 0; + LOG.info("Doris sink statistics: readInNum: {}, writeOutNum: {}, errorNum: {}, ddlNum: {}", + readInNum.get(), writeOutNum.get(), errorNum.get(), ddlNum.get()); + flushing = false; + } + + @SuppressWarnings("rawtypes") + private boolean hasRecords() { + if (batchMap.isEmpty()) { + return false; + } + boolean hasRecords = false; + for (List value : batchMap.values()) { + if (!value.isEmpty()) { + hasRecords = true; + break; + } + } + return hasRecords; } private void load(String tableIdentifier, String result) throws IOException { @@ -286,7 +325,6 @@ public class DorisDynamicSchemaOutputFormat<T> extends RichOutputFormat<T> { for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { dorisStreamLoad.load(tableWithDb[0], tableWithDb[1], result); - batchMap.remove(tableIdentifier); break; } catch (StreamLoadException e) { LOG.error("doris sink error, retry times = {}", i, e);