This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 31f359c7971f0794b4ac7671164c11bc6231d8d5 Author: wudi <676366...@qq.com> AuthorDate: Wed Jun 30 09:27:12 2021 +0800 [FlinkConnector] Support time interval for flink connector (#5934) --- .../doris/flink/cfg/DorisExecutionOptions.java | 20 ++++++- .../flink/table/DorisDynamicOutputFormat.java | 64 ++++++++++++++++++---- .../flink/table/DorisDynamicTableFactory.java | 10 ++++ .../apache/doris/flink/table/DorisStreamLoad.java | 2 +- 4 files changed, 82 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index ee8b09e..330cbc9 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -17,7 +17,10 @@ package org.apache.doris.flink.cfg; +import org.apache.flink.util.Preconditions; + import java.io.Serializable; +import java.time.Duration; /** * JDBC sink batch options. @@ -27,10 +30,13 @@ public class DorisExecutionOptions implements Serializable { private final Integer batchSize; private final Integer maxRetries; + private final Long batchIntervalMs; - public DorisExecutionOptions(Integer batchSize, Integer maxRetries) { + public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long batchIntervalMs) { + Preconditions.checkArgument(maxRetries >= 0); this.batchSize = batchSize; this.maxRetries = maxRetries; + this.batchIntervalMs = batchIntervalMs; } public Integer getBatchSize() { @@ -41,6 +47,10 @@ public class DorisExecutionOptions implements Serializable { return maxRetries; } + public Long getBatchIntervalMs() { + return batchIntervalMs; + } + public static Builder builder() { return new Builder(); } @@ -51,6 +61,7 @@ public class DorisExecutionOptions implements Serializable { public static class Builder { private Integer batchSize; private Integer maxRetries; + private Long batchIntervalMs; public Builder setBatchSize(Integer batchSize) { this.batchSize = batchSize; @@ -62,8 +73,13 @@ public class DorisExecutionOptions implements Serializable { return this; } + public Builder setBatchIntervalMs(Long batchIntervalMs) { + this.batchIntervalMs = batchIntervalMs; + return this; + } + public DorisExecutionOptions build() { - return new DorisExecutionOptions(batchSize,maxRetries); + return new DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs); } } diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 44880b5..4b2f5fe 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -24,15 +24,21 @@ import org.apache.doris.flink.exception.StreamLoadException; import org.apache.doris.flink.rest.RestService; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.StringJoiner; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; /** @@ -51,6 +57,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { private final List<String> batch = new ArrayList<>(); private transient volatile boolean closed = false; + private transient ScheduledExecutorService scheduler; + private transient ScheduledFuture<?> scheduledFuture; + private transient volatile Exception flushException; + public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions readOptions,DorisExecutionOptions executionOptions) { this.options = option; this.readOptions = readOptions; @@ -71,10 +81,33 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { options.getUsername(), options.getPassword()); LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr()); + + if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { + this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format")); + this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { + synchronized (DorisDynamicOutputFormat.this) { + if (!closed) { + try { + flush(); + } catch (Exception e) { + flushException = e; + } + } + } + }, executionOptions.getBatchIntervalMs(), executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS); + } + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to streamload failed.", flushException); + } } @Override - public void writeRecord(RowData row) throws IOException { + public synchronized void writeRecord(RowData row) throws IOException { + checkFlushException(); + addBatch(row); if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) { flush(); @@ -91,22 +124,30 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } @Override - public void close() throws IOException { + public synchronized void close() throws IOException { if (!closed) { closed = true; - if (batch.size() > 0) { - try { - flush(); - } catch (Exception e) { - LOG.warn("Writing records to doris failed.", e); - throw new RuntimeException("Writing records to doris failed.", e); - } + + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + + try { + flush(); + } catch (Exception e) { + LOG.warn("Writing records to doris failed.", e); + throw new RuntimeException("Writing records to doris failed.", e); } } + checkFlushException(); } - - public void flush() throws IOException { + public synchronized void flush() throws IOException { + checkFlushException(); + if(batch.isEmpty()){ + return; + } for (int i = 0; i <= executionOptions.getMaxRetries(); i++) { try { dorisStreamLoad.load(String.join(lineDelimiter,batch)); @@ -129,6 +170,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } } + private String getBackend() throws IOException{ try { //get be url from fe diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index aecda37..27b6f97 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -32,6 +32,7 @@ import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; +import java.time.Duration; import java.util.HashSet; import java.util.Set; @@ -140,6 +141,13 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor .defaultValue(3) .withDescription("the max retry times if writing records to database failed."); + private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions + .key("sink.batch.interval") + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("the flush interval mills, over this time, asynchronous threads will flush data. The " + + "default value is 1s."); + @Override public String factoryIdentifier() { @@ -176,6 +184,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor options.add(SINK_BUFFER_FLUSH_MAX_ROWS); options.add(SINK_MAX_RETRIES); + options.add(SINK_BUFFER_FLUSH_INTERVAL); return options; } @@ -229,6 +238,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactor final DorisExecutionOptions.Builder builder = DorisExecutionOptions.builder(); builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS)); builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES)); + builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis()); return builder.build(); } diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java index fa001a6..ef16f33 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java +++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java @@ -134,7 +134,7 @@ public class DorisStreamLoad implements Serializable{ private LoadResponse loadBatch(String value) { Calendar calendar = Calendar.getInstance(); - String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", + String label = String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org