This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 9d7626f [improve] Add DorisAbstractWriter, Unify DorisSink and
DorisBatchSink (#309)
9d7626f is described below
commit 9d7626fda86c4ce17596752d65a3defe1d5a94ad
Author: wudi <[email protected]>
AuthorDate: Mon Jan 29 11:20:02 2024 +0800
[improve] Add DorisAbstractWriter, Unify DorisSink and DorisBatchSink (#309)
* add DorisAbstractWriter, Unify DorisSink and DorisBatchSink
* Compatible with previous versions
---
.../doris/flink/cfg/DorisExecutionOptions.java | 20 +++++++++-
.../org/apache/doris/flink/sink/DorisSink.java | 45 ++++++++++++----------
.../doris/flink/sink/batch/DorisBatchSink.java | 1 -
.../doris/flink/sink/batch/DorisBatchWriter.java | 26 +++++++++++--
.../flink/sink/writer/DorisAbstractWriter.java | 26 +++++++++++++
.../doris/flink/sink/writer/DorisWriter.java | 9 ++---
.../apache/doris/flink/sink/writer/WriteMode.java | 32 +++++++++++++++
.../doris/flink/table/DorisConfigOptions.java | 8 ++++
.../flink/table/DorisDynamicTableFactory.java | 9 ++++-
.../doris/flink/table/DorisDynamicTableSink.java | 25 ++++--------
.../apache/doris/flink/DorisSinkBatchExample.java | 7 +++-
11 files changed, 155 insertions(+), 53 deletions(-)
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index b86bcd4..a890d34 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -19,6 +19,8 @@ package org.apache.doris.flink.cfg;
import org.apache.flink.util.Preconditions;
+import org.apache.doris.flink.sink.writer.WriteMode;
+
import java.io.Serializable;
import java.util.Properties;
@@ -60,6 +62,7 @@ public class DorisExecutionOptions implements Serializable {
private final long bufferFlushIntervalMs;
private final boolean enableBatchMode;
private final boolean ignoreUpdateBefore;
+ private final WriteMode writeMode;
public DorisExecutionOptions(
int checkInterval,
@@ -77,7 +80,8 @@ public class DorisExecutionOptions implements Serializable {
int bufferFlushMaxBytes,
long bufferFlushIntervalMs,
boolean ignoreUpdateBefore,
- boolean force2PC) {
+ boolean force2PC,
+ WriteMode writeMode) {
Preconditions.checkArgument(maxRetries >= 0);
this.checkInterval = checkInterval;
this.maxRetries = maxRetries;
@@ -97,6 +101,7 @@ public class DorisExecutionOptions implements Serializable {
this.bufferFlushIntervalMs = bufferFlushIntervalMs;
this.ignoreUpdateBefore = ignoreUpdateBefore;
+ this.writeMode = writeMode;
}
public static Builder builder() {
@@ -196,6 +201,10 @@ public class DorisExecutionOptions implements Serializable
{
return force2PC;
}
+ public WriteMode getWriteMode() {
+ return writeMode;
+ }
+
/** Builder of {@link DorisExecutionOptions}. */
public static class Builder {
private int checkInterval = DEFAULT_CHECK_INTERVAL;
@@ -219,6 +228,7 @@ public class DorisExecutionOptions implements Serializable {
private boolean enableBatchMode = false;
private boolean ignoreUpdateBefore = true;
+ private WriteMode writeMode = WriteMode.STREAM_LOAD;
public Builder setCheckInterval(Integer checkInterval) {
this.checkInterval = checkInterval;
@@ -305,6 +315,11 @@ public class DorisExecutionOptions implements Serializable
{
return this;
}
+ public Builder setWriteMode(WriteMode writeMode) {
+ this.writeMode = writeMode;
+ return this;
+ }
+
public DorisExecutionOptions build() {
// If format=json is set but read_json_by_line is not set, record
may not be written.
if (streamLoadProp != null
@@ -328,7 +343,8 @@ public class DorisExecutionOptions implements Serializable {
bufferFlushMaxBytes,
bufferFlushIntervalMs,
ignoreUpdateBefore,
- force2PC);
+ force2PC,
+ writeMode);
}
}
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
index 3da50b2..6a34776 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisSink.java
@@ -27,10 +27,13 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
+import org.apache.doris.flink.sink.batch.DorisBatchWriter;
import org.apache.doris.flink.sink.committer.DorisCommitter;
+import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
import org.apache.doris.flink.sink.writer.DorisWriter;
import org.apache.doris.flink.sink.writer.DorisWriterState;
import org.apache.doris.flink.sink.writer.DorisWriterStateSerializer;
+import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,16 +79,8 @@ public class DorisSink<IN>
}
@Override
- public DorisWriter<IN> createWriter(InitContext initContext) throws
IOException {
- DorisWriter<IN> dorisWriter =
- new DorisWriter<>(
- initContext,
- Collections.emptyList(),
- serializer,
- dorisOptions,
- dorisReadOptions,
- dorisExecutionOptions);
- return dorisWriter;
+ public DorisAbstractWriter createWriter(InitContext initContext) throws
IOException {
+ return getDorisAbstractWriter(initContext, Collections.emptyList());
}
@Override
@@ -95,18 +90,28 @@ public class DorisSink<IN>
}
@Override
- public DorisWriter<IN> restoreWriter(
+ public DorisAbstractWriter restoreWriter(
InitContext initContext, Collection<DorisWriterState>
recoveredState)
throws IOException {
- DorisWriter<IN> dorisWriter =
- new DorisWriter<>(
- initContext,
- recoveredState,
- serializer,
- dorisOptions,
- dorisReadOptions,
- dorisExecutionOptions);
- return dorisWriter;
+ return getDorisAbstractWriter(initContext, recoveredState);
+ }
+
+ public DorisAbstractWriter getDorisAbstractWriter(
+ InitContext initContext, Collection<DorisWriterState> states) {
+ if
(WriteMode.STREAM_LOAD.equals(dorisExecutionOptions.getWriteMode())) {
+ return new DorisWriter<>(
+ initContext,
+ states,
+ serializer,
+ dorisOptions,
+ dorisReadOptions,
+ dorisExecutionOptions);
+ } else if
(WriteMode.STREAM_LOAD_BATCH.equals(dorisExecutionOptions.getWriteMode())) {
+ return new DorisBatchWriter<>(
+ initContext, serializer, dorisOptions, dorisReadOptions,
dorisExecutionOptions);
+ }
+ throw new IllegalArgumentException(
+ "Unsupported write mode " +
dorisExecutionOptions.getWriteMode());
}
@Override
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
index d9a424b..2a94820 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchSink.java
@@ -54,7 +54,6 @@ public class DorisBatchSink<IN> implements Sink<IN> {
dorisOptions,
dorisReadOptions,
dorisExecutionOptions);
- dorisBatchWriter.initializeLoad();
return dorisBatchWriter;
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
index 6a6576c..4b48436 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchWriter.java
@@ -18,7 +18,6 @@
package org.apache.doris.flink.sink.batch;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -26,6 +25,9 @@ import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisCommittable;
+import org.apache.doris.flink.sink.writer.DorisAbstractWriter;
+import org.apache.doris.flink.sink.writer.DorisWriterState;
import org.apache.doris.flink.sink.writer.LabelGenerator;
import org.apache.doris.flink.sink.writer.serializer.DorisRecord;
import org.apache.doris.flink.sink.writer.serializer.DorisRecordSerializer;
@@ -33,11 +35,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-public class DorisBatchWriter<IN> implements SinkWriter<IN> {
+/** Doris Batch StreamLoad. */
+public class DorisBatchWriter<IN>
+ implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable>
{
private static final Logger LOG =
LoggerFactory.getLogger(DorisBatchWriter.class);
private DorisBatchStreamLoad batchStreamLoad;
private final DorisOptions dorisOptions;
@@ -77,10 +85,11 @@ public class DorisBatchWriter<IN> implements SinkWriter<IN>
{
this.dorisReadOptions = dorisReadOptions;
this.executionOptions = executionOptions;
this.flushIntervalMs = executionOptions.getBufferFlushIntervalMs();
+ initializeLoad();
serializer.initial();
}
- public void initializeLoad() throws IOException {
+ public void initializeLoad() {
this.batchStreamLoad =
new DorisBatchStreamLoad(
dorisOptions, dorisReadOptions, executionOptions,
labelGenerator);
@@ -113,6 +122,17 @@ public class DorisBatchWriter<IN> implements
SinkWriter<IN> {
batchStreamLoad.flush(null, true);
}
+ @Override
+ public Collection<DorisCommittable> prepareCommit() throws IOException,
InterruptedException {
+ // nothing to commit
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<DorisWriterState> snapshotState(long checkpointId) throws
IOException {
+ return new ArrayList<>();
+ }
+
public void writeOneDorisRecord(DorisRecord record) throws
InterruptedException {
if (record == null || record.getRow() == null) {
// ddl or value is null
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
new file mode 100644
index 0000000..2e307d6
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisAbstractWriter.java
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+import org.apache.flink.api.connector.sink2.StatefulSink;
+import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
+
+/** Abstract for different Doris Writer: Stream Load, Copy ... */
+public interface DorisAbstractWriter<InputT, WriterStateT, CommT>
+ extends StatefulSink.StatefulSinkWriter<InputT, WriterStateT>,
+ TwoPhaseCommittingSink.PrecommittingSinkWriter<InputT, CommT>
{}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
index 5b8b4fc..c6f8124 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java
@@ -19,8 +19,6 @@ package org.apache.doris.flink.sink.writer;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
-import org.apache.flink.api.connector.sink2.StatefulSink;
-import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
@@ -61,8 +59,7 @@ import static org.apache.doris.flink.sink.LoadStatus.SUCCESS;
* @param <IN>
*/
public class DorisWriter<IN>
- implements StatefulSink.StatefulSinkWriter<IN, DorisWriterState>,
- TwoPhaseCommittingSink.PrecommittingSinkWriter<IN,
DorisCommittable> {
+ implements DorisAbstractWriter<IN, DorisWriterState, DorisCommittable>
{
private static final Logger LOG =
LoggerFactory.getLogger(DorisWriter.class);
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(SUCCESS, PUBLISH_TIMEOUT));
@@ -169,7 +166,7 @@ public class DorisWriter<IN>
}
@Override
- public void write(IN in, Context context) throws IOException {
+ public void write(IN in, Context context) throws IOException,
InterruptedException {
checkLoadException();
writeOneDorisRecord(serializer.serialize(in));
}
@@ -179,7 +176,7 @@ public class DorisWriter<IN>
writeOneDorisRecord(serializer.flush());
}
- public void writeOneDorisRecord(DorisRecord record) throws IOException {
+ public void writeOneDorisRecord(DorisRecord record) throws IOException,
InterruptedException {
if (record == null || record.getRow() == null) {
// ddl or value is null
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
new file mode 100644
index 0000000..ec0dfdb
--- /dev/null
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/WriteMode.java
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.sink.writer;
+
+public enum WriteMode {
+ STREAM_LOAD,
+ STREAM_LOAD_BATCH,
+ COPY;
+
+ public static WriteMode of(String name) {
+ try {
+ return WriteMode.valueOf(name.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Unsupported write mode: " +
name);
+ }
+ }
+}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
index e368e31..2c7c753 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisConfigOptions.java
@@ -21,6 +21,8 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.doris.flink.sink.writer.WriteMode;
+
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
@@ -227,6 +229,12 @@ public class DorisConfigOptions {
.defaultValue(true)
.withDescription("whether to enable the delete function");
+ public static final ConfigOption<String> SINK_WRITE_MODE =
+ ConfigOptions.key("sink.write-mode")
+ .stringType()
+ .defaultValue(WriteMode.STREAM_LOAD.name())
+ .withDescription("Write mode, supports stream_load,
stream_load_batch");
+
public static final ConfigOption<Integer> SINK_PARALLELISM =
FactoryUtil.SINK_PARALLELISM;
public static final ConfigOption<Boolean> SINK_ENABLE_BATCH_MODE =
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
index 49ec868..c7d13f6 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java
@@ -32,6 +32,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.writer.WriteMode;
import java.util.HashSet;
import java.util.Map;
@@ -77,6 +78,7 @@ import static
org.apache.doris.flink.table.DorisConfigOptions.SINK_LABEL_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_MAX_RETRIES;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_PARALLELISM;
import static org.apache.doris.flink.table.DorisConfigOptions.SINK_USE_CACHE;
+import static org.apache.doris.flink.table.DorisConfigOptions.SINK_WRITE_MODE;
import static
org.apache.doris.flink.table.DorisConfigOptions.SOURCE_USE_OLD_API;
import static
org.apache.doris.flink.table.DorisConfigOptions.STREAM_LOAD_PROP_PREFIX;
import static org.apache.doris.flink.table.DorisConfigOptions.TABLE_IDENTIFIER;
@@ -153,6 +155,7 @@ public final class DorisDynamicTableFactory
options.add(SINK_USE_CACHE);
options.add(SOURCE_USE_OLD_API);
+ options.add(SINK_WRITE_MODE);
return options;
}
@@ -231,12 +234,16 @@ public final class DorisDynamicTableFactory
builder.enable2PC();
}
+
builder.setWriteMode(WriteMode.of(readableConfig.get(SINK_WRITE_MODE)));
builder.setBatchMode(readableConfig.get(SINK_ENABLE_BATCH_MODE));
+ // Compatible with previous versions
+ if (readableConfig.get(SINK_ENABLE_BATCH_MODE)) {
+ builder.setWriteMode(WriteMode.STREAM_LOAD_BATCH);
+ }
builder.setFlushQueueSize(readableConfig.get(SINK_FLUSH_QUEUE_SIZE));
builder.setBufferFlushMaxRows(readableConfig.get(SINK_BUFFER_FLUSH_MAX_ROWS));
builder.setBufferFlushMaxBytes(readableConfig.get(SINK_BUFFER_FLUSH_MAX_BYTES));
builder.setBufferFlushIntervalMs(readableConfig.get(SINK_BUFFER_FLUSH_INTERVAL).toMillis());
-
builder.setUseCache(readableConfig.get(SINK_USE_CACHE));
return builder.build();
}
diff --git
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
index 83c10a9..de5b32f 100644
---
a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
+++
b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableSink.java
@@ -29,7 +29,6 @@ import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.DorisSink;
-import org.apache.doris.flink.sink.batch.DorisBatchSink;
import org.apache.doris.flink.sink.writer.serializer.RowDataSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,23 +108,13 @@ public class DorisDynamicTableSink implements
DynamicTableSink {
.setFieldDelimiter(
loadProperties.getProperty(FIELD_DELIMITER_KEY,
FIELD_DELIMITER_DEFAULT));
- if (!executionOptions.enableBatchMode()) {
- DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
- dorisSinkBuilder
- .setDorisOptions(options)
- .setDorisReadOptions(readOptions)
- .setDorisExecutionOptions(executionOptions)
- .setSerializer(serializerBuilder.build());
- return SinkV2Provider.of(dorisSinkBuilder.build(),
sinkParallelism);
- } else {
- DorisBatchSink.Builder<RowData> dorisBatchSinkBuilder =
DorisBatchSink.builder();
- dorisBatchSinkBuilder
- .setDorisOptions(options)
- .setDorisReadOptions(readOptions)
- .setDorisExecutionOptions(executionOptions)
- .setSerializer(serializerBuilder.build());
- return SinkV2Provider.of(dorisBatchSinkBuilder.build(),
sinkParallelism);
- }
+ DorisSink.Builder<RowData> dorisSinkBuilder = DorisSink.builder();
+ dorisSinkBuilder
+ .setDorisOptions(options)
+ .setDorisReadOptions(readOptions)
+ .setDorisExecutionOptions(executionOptions)
+ .setSerializer(serializerBuilder.build());
+ return SinkV2Provider.of(dorisSinkBuilder.build(), sinkParallelism);
}
@Override
diff --git
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
index 3a9d214..46c3185 100644
---
a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
+++
b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSinkBatchExample.java
@@ -24,7 +24,9 @@ import
org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.sink.DorisSink;
import org.apache.doris.flink.sink.batch.DorisBatchSink;
+import org.apache.doris.flink.sink.writer.WriteMode;
import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer;
import java.util.Arrays;
@@ -41,7 +43,7 @@ public class DorisSinkBatchExample {
//
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,
// Time.milliseconds(30000)));
- DorisBatchSink.Builder<String> builder = DorisBatchSink.builder();
+ DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder =
DorisReadOptions.builder();
readOptionBuilder
.setDeserializeArrowAsync(false)
@@ -70,7 +72,8 @@ public class DorisSinkBatchExample {
.setDeletable(false)
.setBufferFlushMaxBytes(8 * 1024)
.setBufferFlushMaxRows(900)
- .setBufferFlushIntervalMs(1000 * 10);
+ .setBufferFlushIntervalMs(1000 * 10)
+ .setWriteMode(WriteMode.STREAM_LOAD_BATCH);
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]