This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d7626f  [improve] Add DorisAbstractWriter, Unify DorisSink and 
DorisBatchSink (#309)
9d7626f is described below

commit 9d7626fda86c4ce17596752d65a3defe1d5a94ad
Author: wudi <[email protected]>
AuthorDate: Mon Jan 29 11:20:02 2024 +0800

    [improve] Add DorisAbstractWriter, Unify DorisSink and DorisBatchSink (#309)
    
    * add DorisAbstractWriter, Unify DorisSink and DorisBatchSink
    * Compatible with previous versions
---
 .../doris/flink/cfg/DorisExecutionOptions.java     | 20 +++++++++-
 .../org/apache/doris/flink/sink/DorisSink.java     | 45 ++++++++++++----------
 .../doris/flink/sink/batch/DorisBatchSink.java     |  1 -
 .../doris/flink/sink/batch/DorisBatchWriter.java   | 26 +++++++++++--
 .../flink/sink/writer/DorisAbstractWriter.java     | 26 +++++++++++++
 .../doris/flink/sink/writer/DorisWriter.java       |  9 ++---
 .../apache/doris/flink/sink/writer/WriteMode.java  | 32 +++++++++++++++
 .../doris/flink/table/DorisConfigOptions.java      |  8 ++++
 .../flink/table/DorisDynamicTableFactory.java      |  9 ++++-
 .../doris/flink/table/DorisDynamicTableSink.java   | 25 ++++--------
 .../apache/doris/flink/DorisSinkBatchExample.java  |  7 +++-
 11 files changed, 155 insertions(+), 53 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index b86bcd4..a890d34 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -19,6 +19,8 @@ package org.apache.doris.flink.cfg;
 
 import org.apache.flink.util.Preconditions;
 
+import org.apache.doris.flink.sink.writer.WriteMode;
+
 import java.io.Serializable;
 import java.util.Properties;
 
@@ -60,6 +62,7 @@ public class DorisExecutionOptions implements Serializable {
     private final long bufferFlushIntervalMs;
     private final boolean enableBatchMode;
     private final boolean ignoreUpdateBefore;
+    private final WriteMode writeMode;
 
     public DorisExecutionOptions(
             int checkInterval,
@@ -77,7 +80,8 @@ public class DorisExecutionOptions implements Serializable {
             int bufferFlushMaxBytes,
             long bufferFlushIntervalMs,
             boolean ignoreUpdateBefore,
-            boolean force2PC) {
+            boolean force2PC,
+            WriteMode writeMode) {
         Preconditions.checkArgument(maxRetries >= 0);
         this.checkInterval = checkInterval;
         this.maxRetries = maxRetries;
@@ -97,6 +101,7 @@ public class DorisExecutionOptions implements Serializable {
         this.bufferFlushIntervalMs = bufferFlushIntervalMs;
 
         this.ignoreUpdateBefore = ignoreUpdateBefore;
+        this.writeMode = writeMode;
     }
 
     public static Builder builder() {
@@ -196,6 +201,10 @@ public class DorisExecutionOptions implements Serializable 
{
         return force2PC;
     }
 
+    public WriteMode getWriteMode() {
+        return writeMode;
+    }
+
     /** Builder of {@link DorisExecutionOptions}. */
     public static class Builder {
         private int checkInterval = DEFAULT_CHECK_INTERVAL;
@@ -219,6 +228,7 @@ public class DorisExecutionOptions implements Serializable {
         private boolean enableBatchMode = false;
 
         private boolean ignoreUpdateBefore = true;
+        private WriteMode writeMode = WriteMode.STREAM_LOAD;
 
         public Builder setCheckInterval(Integer checkInterval) {
             this.checkInterval = checkInterval;
@@ -305,6 +315,11 @@ public class DorisExecutionOptions implements Serializable 
{
             return this;
         }
 
+        public Builder setWriteMode(WriteMode writeMode) {
+            this.writeMode = writeMode;
+            return this;
+        }
+
         public DorisExecutionOptions build() {
             // If format=json is set but read_json_by_line is not set, record 
may not be written.
             if (streamLoadProp != null
@@ -328,7 +343,8 @@ public class DorisExecutionOptions implements Serializable {
                     bufferFlushMaxBytes,
                     bufferFlushIntervalMs,
                     ignoreUpdateBefore,
-                    force2PC);
+                    force2PC,
+                    writeMode);
         }
     }
 }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index 3da50b2..6a34776 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -27,10 +27,13 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.batch.DorisBatchWriter;
 import org.apache.doris.flink.sink.committer.DorisCommitter;
+import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
 import org.apache.doris.flink.sink.writer.DorisWriter;
 import org.apache.doris.flink.sink.writer.DorisWriterState;
 import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
+import org.apache.doris.flink.sink.writer.WriteMode;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -76,16 +79,8 @@ public class DorisSink<IN>
     }
 
     @Override
-    public DorisWriter<IN> createWriter(InitContext initContext) throws 
IOException {
-        DorisWriter<IN> dorisWriter =
-                new DorisWriter<>(
-                        initContext,
-                        Collections.emptyList(),
-                        serializer,
-                        dorisOptions,
-                        dorisReadOptions,
-                        dorisExecutionOptions);
-        return dorisWriter;
+    public DorisAbstractWriter createWriter(InitContext initContext) throws 
IOException {
+        return getDorisAbstractWriter(initContext, Collections.emptyList());
     }
 
     @Override
@@ -95,18 +90,28 @@ public class DorisSink<IN>
     }
 
     @Override
-    public DorisWriter<IN> restoreWriter(
+    public DorisAbstractWriter restoreWriter(
             InitContext initContext, Collection<DorisWriterState> 
recoveredState)
             throws IOException {
-        DorisWriter<IN> dorisWriter =
-                new DorisWriter<>(
-                        initContext,
-                        recoveredState,
-                        serializer,
-                        dorisOptions,
-                        dorisReadOptions,
-                        dorisExecutionOptions);
-        return dorisWriter;
+        return getDorisAbstractWriter(initContext, recoveredState);
+    }
+
+    public DorisAbstractWriter getDorisAbstractWriter(
+            InitContext initContext, Collection<DorisWriterState> states) {
+        if 
(WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())) {
+            return new DorisWriter<>(
+                    initContext,
+                    states,
+                    serializer,
+                    dorisOptions,
+                    dorisReadOptions,
+                    dorisExecutionOptions);
+        } else if 
(WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
+            return new DorisBatchWriter<>(
+                    initContext, serializer, dorisOptions, dorisReadOptions, 
dorisExecutionOptions);
+        }
+        throw new IllegalArgumentException(
+                "Unsupported write mode " + 
dorisExecutionOptions.getWriteMode());
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
index d9a424b..2a94820 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -54,7 +54,6 @@ public class DorisBatchSink<IN> implements Sink<IN> {
                         dorisOptions,
                         dorisReadOptions,
                         dorisExecutionOptions);
-        dorisBatchWriter.initializeLoad();
         return dorisBatchWriter;
     }
 
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 6a6576c..4b48436 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -18,7 +18,6 @@
 package org.apache.doris.flink.sink.batch;
 
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -26,6 +25,9 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisCommittable;
+import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
+import org.apache.doris.flink.sink.writer.DorisWriterState;
 import org.apache.doris.flink.sink.writer.LabelGenerator;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
 import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
@@ -33,11 +35,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
-public class DorisBatchWriter<IN> implements SinkWriter<IN> {
+/** Doris Batch StreamLoad. */
+public class DorisBatchWriter<IN>
+        implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable> 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisBatchWriter.class);
     private DorisBatchStreamLoad batchStreamLoad;
     private final DorisOptions dorisOptions;
@@ -77,10 +85,11 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN> 
{
         this.dorisReadOptions = dorisReadOptions;
         this.executionOptions = executionOptions;
         this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+        initializeLoad();
         serializer.initial();
     }
 
-    public void initializeLoad() throws IOException {
+    public void initializeLoad() {
         this.batchStreamLoad =
                 new DorisBatchStreamLoad(
                         dorisOptions, dorisReadOptions, executionOptions, 
labelGenerator);
@@ -113,6 +122,17 @@ public class DorisBatchWriter<IN> implements 
SinkWriter<IN> {
         batchStreamLoad.flush(null, true);
     }
 
+    @Override
+    public Collection<DorisCommittable> prepareCommit() throws IOException, 
InterruptedException {
+        // nothing to commit
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<DorisWriterState> snapshotState(long checkpointId) throws 
IOException {
+        return new ArrayList<>();
+    }
+
     public void writeOneDorisRecord(DorisRecord record) throws 
InterruptedException {
         if (record == null || record.getRow() == null) {
             // ddl or value is null
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
new file mode 100644
index 0000000..2e307d6
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+/** Abstract for different Doris Writer: Stream Load, Copy ... */
+public interface DorisAbstractWriter<InputT, WriterStateT, CommT>
+        extends StatefulSink.StatefulSinkWriter<InputT, WriterStateT>,
+                TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, CommT> 
{}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 5b8b4fc..c6f8124 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -19,8 +19,6 @@ package org.apache.doris.flink.sink.writer;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -61,8 +59,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
  * @param <IN>
  */
 public class DorisWriter<IN>
-        implements StatefulSink.StatefulSinkWriter<IN, DorisWriterState>,
-                TwoPhaseCommittingSink.PrecommittingSinkWriter<IN, 
DorisCommittable> {
+        implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable> 
{
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisWriter.class);
     private static final List<String> DORIS_SUCCESS_STATUS =
             new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
@@ -169,7 +166,7 @@ public class DorisWriter<IN>
     }
 
     @Override
-    public void write(IN in, Context context) throws IOException {
+    public void write(IN in, Context context) throws IOException, 
InterruptedException {
         checkLoadException();
         writeOneDorisRecord(serializer.serialize(in));
     }
@@ -179,7 +176,7 @@ public class DorisWriter<IN>
         writeOneDorisRecord(serializer.flush());
     }
 
-    public void writeOneDorisRecord(DorisRecord record) throws IOException {
+    public void writeOneDorisRecord(DorisRecord record) throws IOException, 
InterruptedException {
 
         if (record == null || record.getRow() == null) {
             // ddl or value is null
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
new file mode 100644
index 0000000..ec0dfdb
--- /dev/null
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+public enum WriteMode {
+    STREAM_LOAD,
+    STREAM_LOAD_BATCH,
+    COPY;
+
+    public static WriteMode of(String name) {
+        try {
+            return WriteMode.valueOf(name.toUpperCase());
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Unsupported write mode: " + 
name);
+        }
+    }
+}
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index e368e31..2c7c753 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -21,6 +21,8 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.table.factories.FactoryUtil;
 
+import org.apache.doris.flink.sink.writer.WriteMode;
+
 import java.time.Duration;
 import java.util.Map;
 import java.util.Properties;
@@ -227,6 +229,12 @@ public class DorisConfigOptions {
                     .defaultValue(true)
                     .withDescription("whether to enable the delete function");
 
+    public static final ConfigOption<String> SINK_WRITE_MODE =
+            ConfigOptions.key("sink.write-mode")
+                    .stringType()
+                    .defaultValue(WriteMode.STREAM_LOAD.name())
+                    .withDescription("Write mode, supports stream_load, 
stream_load_batch");
+
     public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
 
     public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 49ec868..c7d13f6 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -32,6 +32,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisLookupOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.WriteMode;
 
 import java.util.HashSet;
 import java.util.Map;
@@ -77,6 +78,7 @@ import static 
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
 import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
 import static 
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
 import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -153,6 +155,7 @@ public final class DorisDynamicTableFactory
         options.add(SINK_USE_CACHE);
 
         options.add(SOURCE_USE_OLD_API);
+        options.add(SINK_WRITE_MODE);
         return options;
     }
 
@@ -231,12 +234,16 @@ public final class DorisDynamicTableFactory
             builder.enable2PC();
         }
 
+        
builder.setWriteMode(WriteMode.of(readableConfig.get(SINK_WRITE_MODE)));
         builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
+        // Compatible with previous versions
+        if (readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
+            builder.setWriteMode(WriteMode.STREAM_LOAD_BATCH);
+        }
         builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
         
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
         
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
         
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
-
         builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
         return builder.build();
     }
diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 83c10a9..de5b32f 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -29,7 +29,6 @@ import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
 import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -109,23 +108,13 @@ public class DorisDynamicTableSink implements 
DynamicTableSink {
                 .setFieldDelimiter(
                         loadProperties.getProperty(FIELD_DELIMITER_KEY, 
FIELD_DELIMITER_DEFAULT));
 
-        if (!executionOptions.enableBatchMode()) {
-            DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
-            dorisSinkBuilder
-                    .setDorisOptions(options)
-                    .setDorisReadOptions(readOptions)
-                    .setDorisExecutionOptions(executionOptions)
-                    .setSerializer(serializerBuilder.build());
-            return SinkV2Provider.of(dorisSinkBuilder.build(), 
sinkParallelism);
-        } else {
-            DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder = 
DorisBatchSink.builder();
-            dorisBatchSinkBuilder
-                    .setDorisOptions(options)
-                    .setDorisReadOptions(readOptions)
-                    .setDorisExecutionOptions(executionOptions)
-                    .setSerializer(serializerBuilder.build());
-            return SinkV2Provider.of(dorisBatchSinkBuilder.build(), 
sinkParallelism);
-        }
+        DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
+        dorisSinkBuilder
+                .setDorisOptions(options)
+                .setDorisReadOptions(readOptions)
+                .setDorisExecutionOptions(executionOptions)
+                .setSerializer(serializerBuilder.build());
+        return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
     }
 
     @Override
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index 3a9d214..46c3185 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -24,7 +24,9 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.doris.flink.cfg.DorisExecutionOptions;
 import org.apache.doris.flink.cfg.DorisOptions;
 import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
 import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.writer.WriteMode;
 import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
 
 import java.util.Arrays;
@@ -41,7 +43,7 @@ public class DorisSinkBatchExample {
         // 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
         // env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
         // Time.milliseconds(30000)));
-        DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+        DorisSink.Builder<String> builder = DorisSink.builder();
         final DorisReadOptions.Builder readOptionBuilder = 
DorisReadOptions.builder();
         readOptionBuilder
                 .setDeserializeArrowAsync(false)
@@ -70,7 +72,8 @@ public class DorisSinkBatchExample {
                 .setDeletable(false)
                 .setBufferFlushMaxBytes(8 * 1024)
                 .setBufferFlushMaxRows(900)
-                .setBufferFlushIntervalMs(1000 * 10);
+                .setBufferFlushIntervalMs(1000 * 10)
+                .setWriteMode(WriteMode.STREAM_LOAD_BATCH);
 
         builder.setDorisReadOptions(readOptionBuilder.build())
                 .setDorisExecutionOptions(executionBuilder.build())


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to