This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 31f359c7971f0794b4ac7671164c11bc6231d8d5
Author: wudi <676366...@qq.com>
AuthorDate: Wed Jun 30 09:27:12 2021 +0800

    [FlinkConnector] Support time interval for flink connector (#5934)
---
 .../doris/flink/cfg/DorisExecutionOptions.java     | 20 ++++++-
 .../flink/table/DorisDynamicOutputFormat.java      | 64 ++++++++++++++++++----
 .../flink/table/DorisDynamicTableFactory.java      | 10 ++++
 .../apache/doris/flink/table/DorisStreamLoad.java  |  2 +-
 4 files changed, 82 insertions(+), 14 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index ee8b09e..330cbc9 100644
--- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -17,7 +17,10 @@
 package org.apache.doris.flink.cfg;
 
 
+import org.apache.flink.util.Preconditions;
+
 import java.io.Serializable;
+import java.time.Duration;
 
 /**
  * JDBC sink batch options.
@@ -27,10 +30,13 @@ public class DorisExecutionOptions  implements Serializable 
{
 
     private final Integer batchSize;
     private final Integer maxRetries;
+    private final Long batchIntervalMs;
 
-    public DorisExecutionOptions(Integer batchSize, Integer maxRetries) {
+    public DorisExecutionOptions(Integer batchSize, Integer maxRetries,Long 
batchIntervalMs) {
+        Preconditions.checkArgument(maxRetries >= 0);
         this.batchSize = batchSize;
         this.maxRetries = maxRetries;
+        this.batchIntervalMs = batchIntervalMs;
     }
 
     public Integer getBatchSize() {
@@ -41,6 +47,10 @@ public class DorisExecutionOptions  implements Serializable {
         return maxRetries;
     }
 
+    public Long getBatchIntervalMs() {
+        return batchIntervalMs;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -51,6 +61,7 @@ public class DorisExecutionOptions  implements Serializable {
     public static class Builder {
         private Integer batchSize;
         private Integer maxRetries;
+        private Long batchIntervalMs;
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
@@ -62,8 +73,13 @@ public class DorisExecutionOptions  implements Serializable {
             return this;
         }
 
+        public Builder setBatchIntervalMs(Long batchIntervalMs) {
+            this.batchIntervalMs = batchIntervalMs;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
-            return new DorisExecutionOptions(batchSize,maxRetries);
+            return new 
DorisExecutionOptions(batchSize,maxRetries,batchIntervalMs);
         }
     }
 
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 44880b5..4b2f5fe 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -24,15 +24,21 @@ import org.apache.doris.flink.exception.StreamLoadException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.StringJoiner;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 
 /**
@@ -51,6 +57,10 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData>  {
     private final List<String> batch = new ArrayList<>();
     private transient volatile boolean closed = false;
 
+    private transient ScheduledExecutorService scheduler;
+    private transient ScheduledFuture<?> scheduledFuture;
+    private transient volatile Exception flushException;
+
     public DorisDynamicOutputFormat(DorisOptions option,DorisReadOptions 
readOptions,DorisExecutionOptions executionOptions) {
         this.options = option;
         this.readOptions = readOptions;
@@ -71,10 +81,33 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData>  {
                 options.getUsername(),
                 options.getPassword());
         LOG.info("Streamload BE:{}",dorisStreamLoad.getLoadUrlStr());
+
+        if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
+            this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-streamload-output-format"));
+            this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> 
{
+                synchronized (DorisDynamicOutputFormat.this) {
+                    if (!closed) {
+                        try {
+                            flush();
+                        } catch (Exception e) {
+                            flushException = e;
+                        }
+                    }
+                }
+            }, executionOptions.getBatchIntervalMs(), 
executionOptions.getBatchIntervalMs(), TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private void checkFlushException() {
+        if (flushException != null) {
+            throw new RuntimeException("Writing records to streamload 
failed.", flushException);
+        }
     }
 
     @Override
-    public  void writeRecord(RowData row) throws IOException {
+    public synchronized void writeRecord(RowData row) throws IOException {
+        checkFlushException();
+
         addBatch(row);
         if (executionOptions.getBatchSize() > 0 && batch.size() >= 
executionOptions.getBatchSize()) {
             flush();
@@ -91,22 +124,30 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData>  {
     }
 
     @Override
-    public  void close() throws IOException {
+    public synchronized void close() throws IOException {
         if (!closed) {
             closed = true;
-            if (batch.size() > 0) {
-                try {
-                    flush();
-                } catch (Exception e) {
-                    LOG.warn("Writing records to doris failed.", e);
-                    throw new RuntimeException("Writing records to doris 
failed.", e);
-                }
+
+            if (this.scheduledFuture != null) {
+                scheduledFuture.cancel(false);
+                this.scheduler.shutdown();
+            }
+
+            try {
+                flush();
+            } catch (Exception e) {
+                LOG.warn("Writing records to doris failed.", e);
+                throw new RuntimeException("Writing records to doris failed.", 
e);
             }
         }
+        checkFlushException();
     }
 
-
-    public  void flush() throws IOException {
+    public synchronized void flush() throws IOException {
+        checkFlushException();
+        if(batch.isEmpty()){
+            return;
+        }
         for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
             try {
                 dorisStreamLoad.load(String.join(lineDelimiter,batch));
@@ -129,6 +170,7 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData>  {
         }
     }
 
+
     private String getBackend() throws IOException{
         try {
             //get be url from fe
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index aecda37..27b6f97 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.TableSchemaUtils;
 
+import java.time.Duration;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -140,6 +141,13 @@ public final class DorisDynamicTableFactory implements  
DynamicTableSourceFactor
                        .defaultValue(3)
                        .withDescription("the max retry times if writing 
records to database failed.");
 
+       private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL 
= ConfigOptions
+                       .key("sink.batch.interval")
+                       .durationType()
+                       .defaultValue(Duration.ofSeconds(1))
+                       .withDescription("the flush interval mills, over this 
time, asynchronous threads will flush data. The " +
+                                       "default value is 1s.");
+
 
        @Override
        public String factoryIdentifier() {
@@ -176,6 +184,7 @@ public final class DorisDynamicTableFactory implements  
DynamicTableSourceFactor
 
                options.add(SINK_BUFFER_FLUSH_MAX_ROWS);
                options.add(SINK_MAX_RETRIES);
+               options.add(SINK_BUFFER_FLUSH_INTERVAL);
                return options;
        }
 
@@ -229,6 +238,7 @@ public final class DorisDynamicTableFactory implements  
DynamicTableSourceFactor
                final DorisExecutionOptions.Builder builder = 
DorisExecutionOptions.builder();
                
builder.setBatchSize(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
                builder.setMaxRetries(readableConfig.get(SINK_MAX_RETRIES));
+               
builder.setBatchIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
                return builder.build();
        }
 
diff --git a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java 
b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
index fa001a6..ef16f33 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisStreamLoad.java
@@ -134,7 +134,7 @@ public class DorisStreamLoad implements Serializable{
 
     private LoadResponse loadBatch(String value) {
         Calendar calendar = Calendar.getInstance();
-        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s",
+        String label = 
String.format("flink_connector_%s%02d%02d_%02d%02d%02d_%s",
                 calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, 
calendar.get(Calendar.DAY_OF_MONTH),
                 calendar.get(Calendar.HOUR_OF_DAY), 
calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND),
                 UUID.randomUUID().toString().replaceAll("-", ""));

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to