This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit 340ac7e72d11ad03c66581d9580eac0a3e08ea71 Author: xiaokangguo <40515802+xiaokang...@users.noreply.github.com> AuthorDate: Sat Oct 23 18:10:47 2021 +0800 [Flink]Simplify the use of flink connector (#6892) 1. Simplify the use of flink connector like other stream sink by GenericDorisSinkFunction. 2. Add the use cases of flink connector. ## Use case ``` env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}") .addSink( DorisSink.sink( DorisOptions.builder() .setFenodes("FE_IP:8030") .setTableIdentifier("db.table") .setUsername("root") .setPassword("").build() )); ``` --- .../doris/flink/cfg/DorisExecutionOptions.java | 20 +- .../apache/doris/flink/cfg/DorisReadOptions.java | 5 + .../java/org/apache/doris/flink/cfg/DorisSink.java | 95 +++++++++ .../doris/flink/cfg/GenericDorisSinkFunction.java | 53 +++++ .../flink/table/DorisDynamicOutputFormat.java | 77 +++++--- .../doris/flink/DorisOutPutFormatExample.java | 84 ++++++++ .../apache/doris/flink/DorisStreamSinkExample.java | 219 +++++++++++++++++++++ 7 files changed, 517 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 3d035ab..587ab07 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -20,7 +20,6 @@ package org.apache.doris.flink.cfg; import org.apache.flink.util.Preconditions; import java.io.Serializable; -import java.time.Duration; import java.util.Properties; /** @@ -29,6 +28,10 @@ import java.util.Properties; public class DorisExecutionOptions implements Serializable { private static final long serialVersionUID = 1L; + public static final Integer DEFAULT_BATCH_SIZE = 1000; + public static final Integer DEFAULT_MAX_RETRY_TIMES = 3; + private static final Long DEFAULT_INTERVAL_MILLIS = 10000L; + private final Integer batchSize; private final Integer maxRetries; private final Long batchIntervalMs; @@ -66,14 +69,21 @@ public class DorisExecutionOptions implements Serializable { return new Builder(); } + public static DorisExecutionOptions defaults() { + Properties pro = new Properties(); + pro.setProperty("format", "json"); + pro.setProperty("strip_outer_array", "true"); + return new Builder().setStreamLoadProp(pro).build(); + } + /** * Builder of {@link DorisExecutionOptions}. */ public static class Builder { - private Integer batchSize; - private Integer maxRetries; - private Long batchIntervalMs; - private Properties streamLoadProp; + private Integer batchSize = DEFAULT_BATCH_SIZE; + private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES; + private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS; + private Properties streamLoadProp = new Properties(); public Builder setBatchSize(Integer batchSize) { this.batchSize = batchSize; diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java index 53cefaa..0beb18c 100644 --- a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java +++ b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java @@ -103,6 +103,10 @@ public class DorisReadOptions implements Serializable { return new Builder(); } + public static DorisReadOptions defaults(){ + return DorisReadOptions.builder().build(); + } + /** * Builder of {@link DorisReadOptions}. */ @@ -179,6 +183,7 @@ public class DorisReadOptions implements Serializable { public DorisReadOptions build() { return new DorisReadOptions(readFields, filterQuery, requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, deserializeQueueSize, deserializeArrowAsync); } + } diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisSink.java b/src/main/java/org/apache/doris/flink/cfg/DorisSink.java new file mode 100644 index 0000000..f11c587 --- /dev/null +++ b/src/main/java/org/apache/doris/flink/cfg/DorisSink.java @@ -0,0 +1,95 @@ +package org.apache.doris.flink.cfg; + +import org.apache.doris.flink.table.DorisDynamicOutputFormat; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; +import org.apache.flink.table.types.logical.LogicalType; + +/** Facade to create Doris {@link SinkFunction sinks}. */ +public class DorisSink { + + + private DorisSink() { + } + + + /** + * Create a Doris DataStream sink with the default {@link DorisReadOptions} + * stream elements could only be JsonString. + * + * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions) + */ + public static <T> SinkFunction<T> sink(DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + + return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(), executionOptions, dorisOptions); + } + + /** + * Create a Doris DataStream sink with the default {@link DorisReadOptions} + * stream elements could only be RowData. + * + * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions) + */ + public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types, + DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + + return sink(fiels, types, DorisReadOptions.defaults(), executionOptions, dorisOptions); + } + + /** + * Create a Doris DataStream sink with the default {@link DorisExecutionOptions} + * stream elements could only be JsonString. + * + * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions) + */ + public static <T> SinkFunction<T> sink(DorisOptions dorisOptions) { + + return sink(new String[]{}, new LogicalType[]{}, DorisReadOptions.defaults(), + DorisExecutionOptions.defaults(), dorisOptions); + } + + /** + * Create a Doris DataStream sink with the default {@link DorisExecutionOptions} + * stream elements could only be RowData. + * + * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions) + */ + public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types, DorisOptions dorisOptions) { + return sink(fiels, types, DorisReadOptions.defaults(), DorisExecutionOptions.defaults(), dorisOptions); + } + + + /** + * Create a Doris DataStream sink, stream elements could only be JsonString. + * + * @see #sink(String[], LogicalType[], DorisReadOptions, DorisExecutionOptions, DorisOptions) + */ + public static <T> SinkFunction<T> sink(DorisReadOptions readOptions, + DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + + return sink(new String[]{}, new LogicalType[]{}, readOptions, executionOptions, dorisOptions); + } + + + /** + * Create a Doris DataStream sink, stream elements could only be RowData. + * + * <p>Note: the objects passed to the return sink can be processed in batch and retried. + * Therefore, objects can not be {@link org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}. + * </p> + * + * @param field array of field + * @param types types of field + * @param readOptions parameters of read, such as readFields, filterQuery + * @param executionOptions parameters of execution, such as batch size and maximum retries + * @param dorisOptions parameters of options, such as fenodes, username, password, tableIdentifier + * @param <T> type of data in {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord + * StreamRecord}. + */ + public static <T> SinkFunction<T> sink(String[] field, LogicalType[] types, DorisReadOptions readOptions, + DorisExecutionOptions executionOptions, DorisOptions dorisOptions) { + + return new GenericDorisSinkFunction(new DorisDynamicOutputFormat( + dorisOptions, readOptions, executionOptions, types, field)); + } + +} diff --git a/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java b/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java new file mode 100644 index 0000000..92dd300 --- /dev/null +++ b/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java @@ -0,0 +1,53 @@ +package org.apache.doris.flink.cfg; + +import org.apache.doris.flink.table.DorisDynamicOutputFormat; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.runtime.state.FunctionSnapshotContext; +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +public class GenericDorisSinkFunction<T> extends RichSinkFunction<T> + implements CheckpointedFunction { + + private final DorisDynamicOutputFormat outputFormat; + + public GenericDorisSinkFunction(@Nonnull DorisDynamicOutputFormat outputFormat) { + this.outputFormat = Preconditions.checkNotNull(outputFormat); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + RuntimeContext ctx = getRuntimeContext(); + outputFormat.setRuntimeContext(ctx); + outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks()); + } + + + @Override + public void invoke(T value, Context context) throws Exception { + outputFormat.writeRecord(value); + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception { + + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + + } + + @Override + public void close() throws Exception { + outputFormat.close(); + super.close(); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java index 73c68b6..0fd154a 100644 --- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java +++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java @@ -34,10 +34,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Arrays; import java.util.StringJoiner; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -50,7 +50,7 @@ import static org.apache.flink.table.data.RowData.createFieldGetter; /** * DorisDynamicOutputFormat **/ -public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { +public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> { private static final Logger LOG = LoggerFactory.getLogger(DorisDynamicOutputFormat.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); @@ -88,8 +88,10 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { this.options = option; this.readOptions = readOptions; this.executionOptions = executionOptions; - this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, FIELD_DELIMITER_DEFAULT); - this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, LINE_DELIMITER_DEFAULT); + this.fieldDelimiter = executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, + FIELD_DELIMITER_DEFAULT); + this.lineDelimiter = executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, + LINE_DELIMITER_DEFAULT); this.fieldNames = fieldNames; this.jsonFormat = FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY)); this.fieldGetters = new RowData.FieldGetter[logicalTypes.length]; @@ -98,6 +100,7 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } } + @Override public void configure(Configuration configuration) { } @@ -105,16 +108,17 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { @Override public void open(int taskNumber, int numTasks) throws IOException { dorisStreamLoad = new DorisStreamLoad( - getBackend(), - options.getTableIdentifier().split("\\.")[0], - options.getTableIdentifier().split("\\.")[1], - options.getUsername(), - options.getPassword(), - executionOptions.getStreamLoadProp()); + getBackend(), + options.getTableIdentifier().split("\\.")[0], + options.getTableIdentifier().split("\\.")[1], + options.getUsername(), + options.getPassword(), + executionOptions.getStreamLoadProp()); LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr()); if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) { - this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output-format")); + this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("doris-streamload-output" + + "-format")); this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> { synchronized (DorisDynamicOutputFormat.this) { if (!closed) { @@ -136,30 +140,37 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } @Override - public synchronized void writeRecord(RowData row) throws IOException { + public synchronized void writeRecord(T row) throws IOException { checkFlushException(); - addBatch(row); if (executionOptions.getBatchSize() > 0 && batch.size() >= executionOptions.getBatchSize()) { flush(); } } - private void addBatch(RowData row) { - Map<String, String> valueMap = new HashMap<>(); - StringJoiner value = new StringJoiner(this.fieldDelimiter); - for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) { - Object field = fieldGetters[i].getFieldOrNull(row); - if (jsonFormat) { - String data = field != null ? field.toString() : null; - valueMap.put(this.fieldNames[i], data); - } else { - String data = field != null ? field.toString() : NULL_VALUE; - value.add(data); + private void addBatch(T row) { + if (row instanceof RowData) { + RowData rowData = (RowData) row; + Map<String, String> valueMap = new HashMap<>(); + StringJoiner value = new StringJoiner(this.fieldDelimiter); + for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; ++i) { + Object field = fieldGetters[i].getFieldOrNull(rowData); + if (jsonFormat) { + String data = field != null ? field.toString() : null; + valueMap.put(this.fieldNames[i], data); + } else { + String data = field != null ? field.toString() : NULL_VALUE; + value.add(data); + } } + Object data = jsonFormat ? valueMap : value.toString(); + batch.add(data); + + } else if (row instanceof String) { + batch.add(row); + } else { + throw new RuntimeException("The type of element should be 'RowData' or 'String' only."); } - Object data = jsonFormat ? valueMap : value.toString(); - batch.add(data); } @Override @@ -189,7 +200,11 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { } String result; if (jsonFormat) { - result = OBJECT_MAPPER.writeValueAsString(batch); + if (batch.get(0) instanceof String) { + result = batch.toString(); + } else { + result = OBJECT_MAPPER.writeValueAsString(batch); + } } else { result = String.join(this.lineDelimiter, batch); } @@ -292,11 +307,11 @@ public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> { public DorisDynamicOutputFormat build() { final LogicalType[] logicalTypes = - Arrays.stream(fieldDataTypes) - .map(DataType::getLogicalType) - .toArray(LogicalType[]::new); + Arrays.stream(fieldDataTypes) + .map(DataType::getLogicalType) + .toArray(LogicalType[]::new); return new DorisDynamicOutputFormat( - optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames + optionsBuilder.build(), readOptions, executionOptions, logicalTypes, fieldNames ); } } diff --git a/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java b/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java new file mode 100644 index 0000000..a64e3d9 --- /dev/null +++ b/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.table.DorisDynamicOutputFormat; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.MapOperator; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.io.IOException; + +/** + * example using {@link DorisDynamicOutputFormat} for batching. + */ +public class DorisOutPutFormatExample { + + public static void main(String[] args) throws Exception { + + ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + MapOperator<String, RowData> data = env.fromElements("") + .map(new MapFunction<String, RowData>() { + @Override + public RowData map(String value) throws Exception { + GenericRowData genericRowData = new GenericRowData(3); + genericRowData.setField(0, StringData.fromString("北京")); + genericRowData.setField(1, 116.405419); + genericRowData.setField(2, 39.916927); + return genericRowData; + } + }); + + DorisOptions dorisOptions = DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build(); + DorisReadOptions readOptions = DorisReadOptions.defaults(); + DorisExecutionOptions executionOptions = DorisExecutionOptions.defaults(); + + LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()}; + String[] fiels = {"city", "longitude", "latitude"}; + + DorisDynamicOutputFormat outputFormat = + new DorisDynamicOutputFormat(dorisOptions, readOptions, executionOptions, types, fiels); + + try { + outputFormat.open(0, 1); + data.output(outputFormat); + outputFormat.close(); + } catch (IOException e) { + e.printStackTrace(); + } + + env.execute("doris batch sink example"); + + + } + +} diff --git a/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java b/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java new file mode 100644 index 0000000..d37fd0d --- /dev/null +++ b/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java @@ -0,0 +1,219 @@ +package org.apache.doris.flink; + +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.cfg.DorisSink; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.DoubleType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.util.Properties; + +/** + * example using {@link DorisSink} for streaming. + */ +public class DorisStreamSinkExample { + + + public void testJsonString() throws Exception { + /* + * Example for JsonString element + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + Properties pro = new Properties(); + pro.setProperty("format", "json"); + pro.setProperty("strip_outer_array", "true"); + env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}") + .addSink( + DorisSink.sink( + DorisReadOptions.builder().build(), + DorisExecutionOptions.builder() + .setBatchSize(3) + .setBatchIntervalMs(0l) + .setMaxRetries(3) + .setStreamLoadProp(pro).build(), + DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } + + + public void testJsonStringWithDefaultReadOptions() throws Exception { + /* + * Example for JsonString element with default ReadOptions + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + Properties pro = new Properties(); + pro.setProperty("format", "json"); + pro.setProperty("strip_outer_array", "true"); + env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}") + .addSink( + DorisSink.sink( + DorisExecutionOptions.builder() + .setBatchSize(3) + .setBatchIntervalMs(0l) + .setMaxRetries(3) + .setStreamLoadProp(pro).build(), + DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } + + + public void testJsonStringWithDefaultReadOptionsAndExecutionOptions() throws Exception { + /* + * Example for JsonString element with default ReadOptions and ExecutionOptions + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", \"latitude\": \"39.916927\"}") + .addSink( + DorisSink.sink( + DorisOptions.builder() + .setFenodes("192.168.52.101:8030") + .setTableIdentifier("smarttrip_db.doris_output_format") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } + + + public void testRowData() throws Exception { + /* + * Example for RowData element + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<RowData> source = env.fromElements("") + .map(new MapFunction<String, RowData>() { + @Override + public RowData map(String value) throws Exception { + GenericRowData genericRowData = new GenericRowData(3); + genericRowData.setField(0, StringData.fromString("北京")); + genericRowData.setField(1, 116.405419); + genericRowData.setField(2, 39.916927); + return genericRowData; + } + }); + + String[] fields = {"city", "longitude", "latitude"}; + LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()}; + + source.addSink( + DorisSink.sink( + fields, + types, + DorisReadOptions.builder().build(), + DorisExecutionOptions.builder() + .setBatchSize(3) + .setBatchIntervalMs(0L) + .setMaxRetries(3) + .build(), + DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } + + + public void testRowDataWithDefaultReadOptions() throws Exception { + /* + * Example for RowData element with default ReadOptions + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<RowData> source = env.fromElements("") + .map(new MapFunction<String, RowData>() { + @Override + public RowData map(String value) throws Exception { + GenericRowData genericRowData = new GenericRowData(3); + genericRowData.setField(0, StringData.fromString("北京")); + genericRowData.setField(1, 116.405419); + genericRowData.setField(2, 39.916927); + return genericRowData; + } + }); + + String[] fields = {"city", "longitude", "latitude"}; + LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()}; + + source.addSink( + DorisSink.sink( + fields, + types, + DorisExecutionOptions.builder() + .setBatchSize(3) + .setBatchIntervalMs(0L) + .setMaxRetries(3) + .build(), + DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } + + + public void testRowDataWithDefaultReadOptionsAndExecutionOptions() throws Exception { + /* + * Example for RowData element with default ReadOptions and ExecutionOptions + */ + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream<RowData> source = env.fromElements("") + .map(new MapFunction<String, RowData>() { + @Override + public RowData map(String value) throws Exception { + GenericRowData genericRowData = new GenericRowData(3); + genericRowData.setField(0, StringData.fromString("北京")); + genericRowData.setField(1, 116.405419); + genericRowData.setField(2, 39.916927); + return genericRowData; + } + }); + + String[] fields = {"city", "longitude", "latitude"}; + LogicalType[] types = {new VarCharType(), new DoubleType(), new DoubleType()}; + + source.addSink( + DorisSink.sink( + fields, + types, + DorisOptions.builder() + .setFenodes("FE_IP:8030") + .setTableIdentifier("db.table") + .setUsername("root") + .setPassword("").build() + )); + env.execute("doris stream sink example"); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org