This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit 340ac7e72d11ad03c66581d9580eac0a3e08ea71
Author: xiaokangguo <40515802+xiaokang...@users.noreply.github.com>
AuthorDate: Sat Oct 23 18:10:47 2021 +0800

    [Flink]Simplify the use of flink connector  (#6892)
    
    1. Simplify the use of flink connector like other stream sink by 
GenericDorisSinkFunction.
    2. Add the use cases of flink connector.
    
    ## Use case
    ```
    env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", 
\"latitude\": \"39.916927\"}")
         .addSink(
              DorisSink.sink(
                 DorisOptions.builder()
                       .setFenodes("FE_IP:8030")
                       .setTableIdentifier("db.table")
                       .setUsername("root")
                       .setPassword("").build()
                    ));
    ```
---
 .../doris/flink/cfg/DorisExecutionOptions.java     |  20 +-
 .../apache/doris/flink/cfg/DorisReadOptions.java   |   5 +
 .../java/org/apache/doris/flink/cfg/DorisSink.java |  95 +++++++++
 .../doris/flink/cfg/GenericDorisSinkFunction.java  |  53 +++++
 .../flink/table/DorisDynamicOutputFormat.java      |  77 +++++---
 .../doris/flink/DorisOutPutFormatExample.java      |  84 ++++++++
 .../apache/doris/flink/DorisStreamSinkExample.java | 219 +++++++++++++++++++++
 7 files changed, 517 insertions(+), 36 deletions(-)

diff --git 
a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
index 3d035ab..587ab07 100644
--- a/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java
@@ -20,7 +20,6 @@ package org.apache.doris.flink.cfg;
 import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
-import java.time.Duration;
 import java.util.Properties;
 
 /**
@@ -29,6 +28,10 @@ import java.util.Properties;
 public class DorisExecutionOptions implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    public static final Integer DEFAULT_BATCH_SIZE = 1000;
+    public static final Integer DEFAULT_MAX_RETRY_TIMES = 3;
+    private static final Long DEFAULT_INTERVAL_MILLIS = 10000L;
+
     private final Integer batchSize;
     private final Integer maxRetries;
     private final Long batchIntervalMs;
@@ -66,14 +69,21 @@ public class DorisExecutionOptions implements Serializable {
         return new Builder();
     }
 
+    public static DorisExecutionOptions defaults() {
+        Properties pro = new Properties();
+        pro.setProperty("format", "json");
+        pro.setProperty("strip_outer_array", "true");
+        return new Builder().setStreamLoadProp(pro).build();
+    }
+
     /**
      * Builder of {@link DorisExecutionOptions}.
      */
     public static class Builder {
-        private Integer batchSize;
-        private Integer maxRetries;
-        private Long batchIntervalMs;
-        private Properties streamLoadProp;
+        private Integer batchSize = DEFAULT_BATCH_SIZE;
+        private Integer maxRetries = DEFAULT_MAX_RETRY_TIMES;
+        private Long batchIntervalMs = DEFAULT_INTERVAL_MILLIS;
+        private Properties streamLoadProp = new Properties();
 
         public Builder setBatchSize(Integer batchSize) {
             this.batchSize = batchSize;
diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
index 53cefaa..0beb18c 100644
--- a/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisReadOptions.java
@@ -103,6 +103,10 @@ public class DorisReadOptions implements Serializable {
         return new Builder();
     }
 
+    public static DorisReadOptions defaults(){
+        return DorisReadOptions.builder().build();
+    }
+
     /**
      * Builder of {@link DorisReadOptions}.
      */
@@ -179,6 +183,7 @@ public class DorisReadOptions implements Serializable {
         public DorisReadOptions build() {
             return new DorisReadOptions(readFields, filterQuery, 
requestTabletSize, requestConnectTimeoutMs, requestReadTimeoutMs, 
requestQueryTimeoutS, requestRetries, requestBatchSize, execMemLimit, 
deserializeQueueSize, deserializeArrowAsync);
         }
+
     }
 
 
diff --git a/src/main/java/org/apache/doris/flink/cfg/DorisSink.java 
b/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
new file mode 100644
index 0000000..f11c587
--- /dev/null
+++ b/src/main/java/org/apache/doris/flink/cfg/DorisSink.java
@@ -0,0 +1,95 @@
+package org.apache.doris.flink.cfg;
+
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.table.types.logical.LogicalType;
+
+/** Facade to create Doris {@link SinkFunction sinks}. */
+public class DorisSink {
+
+
+    private DorisSink() {
+    }
+
+
+    /**
+     * Create a Doris DataStream sink with the default {@link DorisReadOptions}
+     * stream elements could only be JsonString.
+     *
+     * @see #sink(String[], LogicalType[], DorisReadOptions, 
DorisExecutionOptions, DorisOptions)
+     */
+    public static <T> SinkFunction<T> sink(DorisExecutionOptions 
executionOptions, DorisOptions dorisOptions) {
+
+        return sink(new String[]{}, new LogicalType[]{}, 
DorisReadOptions.defaults(), executionOptions, dorisOptions);
+    }
+
+    /**
+     * Create a Doris DataStream sink with the default {@link DorisReadOptions}
+     * stream elements could only be RowData.
+     *
+     * @see #sink(String[], LogicalType[], DorisReadOptions, 
DorisExecutionOptions, DorisOptions)
+     */
+    public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] types,
+                                           DorisExecutionOptions 
executionOptions, DorisOptions dorisOptions) {
+
+        return sink(fiels, types, DorisReadOptions.defaults(), 
executionOptions, dorisOptions);
+    }
+
+    /**
+     * Create a Doris DataStream sink with the default {@link 
DorisExecutionOptions}
+     * stream elements could only be JsonString.
+     *
+     * @see #sink(String[], LogicalType[], DorisReadOptions, 
DorisExecutionOptions, DorisOptions)
+     */
+    public static <T> SinkFunction<T> sink(DorisOptions dorisOptions) {
+
+        return sink(new String[]{}, new LogicalType[]{}, 
DorisReadOptions.defaults(),
+                DorisExecutionOptions.defaults(), dorisOptions);
+    }
+
+    /**
+     * Create a Doris DataStream sink with the default {@link 
DorisExecutionOptions}
+     * stream elements could only be RowData.
+     *
+     * @see #sink(String[], LogicalType[], DorisReadOptions, 
DorisExecutionOptions, DorisOptions)
+     */
+    public static <T> SinkFunction<T> sink(String[] fiels, LogicalType[] 
types, DorisOptions dorisOptions) {
+        return sink(fiels, types, DorisReadOptions.defaults(), 
DorisExecutionOptions.defaults(), dorisOptions);
+    }
+
+
+    /**
+     * Create a Doris DataStream sink, stream elements could only be 
JsonString.
+     *
+     * @see #sink(String[], LogicalType[], DorisReadOptions, 
DorisExecutionOptions, DorisOptions)
+     */
+    public static <T> SinkFunction<T> sink(DorisReadOptions readOptions,
+                                           DorisExecutionOptions 
executionOptions, DorisOptions dorisOptions) {
+
+        return sink(new String[]{}, new LogicalType[]{}, readOptions, 
executionOptions, dorisOptions);
+    }
+
+
+    /**
+     * Create a Doris DataStream sink, stream elements could only be RowData.
+     *
+     * <p>Note: the objects passed to the return sink can be processed in 
batch and retried.
+     * Therefore, objects can not be {@link 
org.apache.flink.api.common.ExecutionConfig#enableObjectReuse() reused}.
+     * </p>
+     *
+     * @param field            array of field
+     * @param types            types of field
+     * @param readOptions      parameters of read, such as readFields, 
filterQuery
+     * @param executionOptions parameters of execution, such as batch size and 
maximum retries
+     * @param dorisOptions     parameters of options, such as fenodes, 
username, password, tableIdentifier
+     * @param <T>              type of data in {@link 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord
+     * StreamRecord}.
+     */
+    public static <T> SinkFunction<T> sink(String[] field, LogicalType[] 
types, DorisReadOptions readOptions,
+                                           DorisExecutionOptions 
executionOptions, DorisOptions dorisOptions) {
+
+        return new GenericDorisSinkFunction(new DorisDynamicOutputFormat(
+                dorisOptions, readOptions, executionOptions, types, field));
+    }
+
+}
diff --git 
a/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java 
b/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
new file mode 100644
index 0000000..92dd300
--- /dev/null
+++ b/src/main/java/org/apache/doris/flink/cfg/GenericDorisSinkFunction.java
@@ -0,0 +1,53 @@
+package org.apache.doris.flink.cfg;
+
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+public class GenericDorisSinkFunction<T> extends RichSinkFunction<T>
+        implements CheckpointedFunction {
+
+    private final DorisDynamicOutputFormat outputFormat;
+
+    public GenericDorisSinkFunction(@Nonnull DorisDynamicOutputFormat 
outputFormat) {
+        this.outputFormat = Preconditions.checkNotNull(outputFormat);
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        RuntimeContext ctx = getRuntimeContext();
+        outputFormat.setRuntimeContext(ctx);
+        outputFormat.open(ctx.getIndexOfThisSubtask(), 
ctx.getNumberOfParallelSubtasks());
+    }
+
+
+    @Override
+    public void invoke(T value, Context context) throws Exception {
+        outputFormat.writeRecord(value);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws 
Exception {
+
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        outputFormat.close();
+        super.close();
+    }
+
+}
\ No newline at end of file
diff --git 
a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java 
b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
index 73c68b6..0fd154a 100644
--- a/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
+++ b/src/main/java/org/apache/doris/flink/table/DorisDynamicOutputFormat.java
@@ -34,10 +34,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Arrays;
 import java.util.StringJoiner;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -50,7 +50,7 @@ import static 
org.apache.flink.table.data.RowData.createFieldGetter;
 /**
  * DorisDynamicOutputFormat
  **/
-public class DorisDynamicOutputFormat extends RichOutputFormat<RowData> {
+public class DorisDynamicOutputFormat<T> extends RichOutputFormat<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisDynamicOutputFormat.class);
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -88,8 +88,10 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
         this.options = option;
         this.readOptions = readOptions;
         this.executionOptions = executionOptions;
-        this.fieldDelimiter = 
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY, 
FIELD_DELIMITER_DEFAULT);
-        this.lineDelimiter = 
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY, 
LINE_DELIMITER_DEFAULT);
+        this.fieldDelimiter = 
executionOptions.getStreamLoadProp().getProperty(FIELD_DELIMITER_KEY,
+                FIELD_DELIMITER_DEFAULT);
+        this.lineDelimiter = 
executionOptions.getStreamLoadProp().getProperty(LINE_DELIMITER_KEY,
+                LINE_DELIMITER_DEFAULT);
         this.fieldNames = fieldNames;
         this.jsonFormat = 
FORMAT_JSON_VALUE.equals(executionOptions.getStreamLoadProp().getProperty(FORMAT_KEY));
         this.fieldGetters = new RowData.FieldGetter[logicalTypes.length];
@@ -98,6 +100,7 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
         }
     }
 
+
     @Override
     public void configure(Configuration configuration) {
     }
@@ -105,16 +108,17 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
     @Override
     public void open(int taskNumber, int numTasks) throws IOException {
         dorisStreamLoad = new DorisStreamLoad(
-            getBackend(),
-            options.getTableIdentifier().split("\\.")[0],
-            options.getTableIdentifier().split("\\.")[1],
-            options.getUsername(),
-            options.getPassword(),
-            executionOptions.getStreamLoadProp());
+                getBackend(),
+                options.getTableIdentifier().split("\\.")[0],
+                options.getTableIdentifier().split("\\.")[1],
+                options.getUsername(),
+                options.getPassword(),
+                executionOptions.getStreamLoadProp());
         LOG.info("Streamload BE:{}", dorisStreamLoad.getLoadUrlStr());
 
         if (executionOptions.getBatchIntervalMs() != 0 && 
executionOptions.getBatchSize() != 1) {
-            this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-streamload-output-format"));
+            this.scheduler = Executors.newScheduledThreadPool(1, new 
ExecutorThreadFactory("doris-streamload-output" +
+                    "-format"));
             this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(() -> 
{
                 synchronized (DorisDynamicOutputFormat.this) {
                     if (!closed) {
@@ -136,30 +140,37 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
     }
 
     @Override
-    public synchronized void writeRecord(RowData row) throws IOException {
+    public synchronized void writeRecord(T row) throws IOException {
         checkFlushException();
-
         addBatch(row);
         if (executionOptions.getBatchSize() > 0 && batch.size() >= 
executionOptions.getBatchSize()) {
             flush();
         }
     }
 
-    private void addBatch(RowData row) {
-        Map<String, String> valueMap = new HashMap<>();
-        StringJoiner value = new StringJoiner(this.fieldDelimiter);
-        for (int i = 0; i < row.getArity() && i < fieldGetters.length; ++i) {
-            Object field = fieldGetters[i].getFieldOrNull(row);
-            if (jsonFormat) {
-                String data = field != null ? field.toString() : null;
-                valueMap.put(this.fieldNames[i], data);
-            } else {
-                String data = field != null ? field.toString() : NULL_VALUE;
-                value.add(data);
+    private void addBatch(T row) {
+        if (row instanceof RowData) {
+            RowData rowData = (RowData) row;
+            Map<String, String> valueMap = new HashMap<>();
+            StringJoiner value = new StringJoiner(this.fieldDelimiter);
+            for (int i = 0; i < rowData.getArity() && i < fieldGetters.length; 
++i) {
+                Object field = fieldGetters[i].getFieldOrNull(rowData);
+                if (jsonFormat) {
+                    String data = field != null ? field.toString() : null;
+                    valueMap.put(this.fieldNames[i], data);
+                } else {
+                    String data = field != null ? field.toString() : 
NULL_VALUE;
+                    value.add(data);
+                }
             }
+            Object data = jsonFormat ? valueMap : value.toString();
+            batch.add(data);
+
+        } else if (row instanceof String) {
+            batch.add(row);
+        } else {
+            throw new RuntimeException("The type of element should be 
'RowData' or 'String' only.");
         }
-        Object data = jsonFormat ? valueMap : value.toString();
-        batch.add(data);
     }
 
     @Override
@@ -189,7 +200,11 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
         }
         String result;
         if (jsonFormat) {
-            result = OBJECT_MAPPER.writeValueAsString(batch);
+            if (batch.get(0) instanceof String) {
+                result = batch.toString();
+            } else {
+                result = OBJECT_MAPPER.writeValueAsString(batch);
+            }
         } else {
             result = String.join(this.lineDelimiter, batch);
         }
@@ -292,11 +307,11 @@ public class DorisDynamicOutputFormat extends 
RichOutputFormat<RowData> {
 
         public DorisDynamicOutputFormat build() {
             final LogicalType[] logicalTypes =
-                Arrays.stream(fieldDataTypes)
-                    .map(DataType::getLogicalType)
-                    .toArray(LogicalType[]::new);
+                    Arrays.stream(fieldDataTypes)
+                            .map(DataType::getLogicalType)
+                            .toArray(LogicalType[]::new);
             return new DorisDynamicOutputFormat(
-                optionsBuilder.build(), readOptions, executionOptions, 
logicalTypes, fieldNames
+                    optionsBuilder.build(), readOptions, executionOptions, 
logicalTypes, fieldNames
             );
         }
     }
diff --git a/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java 
b/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java
new file mode 100644
index 0000000..a64e3d9
--- /dev/null
+++ b/src/test/java/org/apache/doris/flink/DorisOutPutFormatExample.java
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.table.DorisDynamicOutputFormat;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.io.IOException;
+
+/**
+ * example using {@link DorisDynamicOutputFormat} for batching.
+ */
+public class DorisOutPutFormatExample {
+
+    public static void main(String[] args) throws Exception {
+
+        ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        MapOperator<String, RowData> data = env.fromElements("")
+                .map(new MapFunction<String, RowData>() {
+                    @Override
+                    public RowData map(String value) throws Exception {
+                        GenericRowData genericRowData = new GenericRowData(3);
+                        genericRowData.setField(0, 
StringData.fromString("北京"));
+                        genericRowData.setField(1, 116.405419);
+                        genericRowData.setField(2, 39.916927);
+                        return genericRowData;
+                    }
+                });
+
+        DorisOptions dorisOptions = DorisOptions.builder()
+                .setFenodes("FE_IP:8030")
+                .setTableIdentifier("db.table")
+                .setUsername("root")
+                .setPassword("").build();
+        DorisReadOptions readOptions = DorisReadOptions.defaults();
+        DorisExecutionOptions executionOptions = 
DorisExecutionOptions.defaults();
+
+        LogicalType[] types = {new VarCharType(), new DoubleType(), new 
DoubleType()};
+        String[] fiels = {"city", "longitude", "latitude"};
+
+        DorisDynamicOutputFormat outputFormat =
+                new DorisDynamicOutputFormat(dorisOptions, readOptions, 
executionOptions, types, fiels);
+
+        try {
+            outputFormat.open(0, 1);
+            data.output(outputFormat);
+            outputFormat.close();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        env.execute("doris batch sink example");
+
+
+    }
+
+}
diff --git a/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java 
b/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java
new file mode 100644
index 0000000..d37fd0d
--- /dev/null
+++ b/src/test/java/org/apache/doris/flink/DorisStreamSinkExample.java
@@ -0,0 +1,219 @@
+package org.apache.doris.flink;
+
+import org.apache.doris.flink.cfg.DorisExecutionOptions;
+import org.apache.doris.flink.cfg.DorisOptions;
+import org.apache.doris.flink.cfg.DorisReadOptions;
+import org.apache.doris.flink.cfg.DorisSink;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.Properties;
+
+/**
+ * example using {@link DorisSink} for streaming.
+ */
+public class DorisStreamSinkExample {
+
+
+    public void testJsonString() throws Exception {
+        /*
+         * Example for JsonString element
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        Properties pro = new Properties();
+        pro.setProperty("format", "json");
+        pro.setProperty("strip_outer_array", "true");
+        env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", 
\"latitude\": \"39.916927\"}")
+                .addSink(
+                        DorisSink.sink(
+                                DorisReadOptions.builder().build(),
+                                DorisExecutionOptions.builder()
+                                        .setBatchSize(3)
+                                        .setBatchIntervalMs(0l)
+                                        .setMaxRetries(3)
+                                        .setStreamLoadProp(pro).build(),
+                                DorisOptions.builder()
+                                        .setFenodes("FE_IP:8030")
+                                        .setTableIdentifier("db.table")
+                                        .setUsername("root")
+                                        .setPassword("").build()
+                        ));
+        env.execute("doris stream sink example");
+    }
+
+
+    public void testJsonStringWithDefaultReadOptions() throws Exception {
+        /*
+         * Example for JsonString element with default ReadOptions
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        Properties pro = new Properties();
+        pro.setProperty("format", "json");
+        pro.setProperty("strip_outer_array", "true");
+        env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", 
\"latitude\": \"39.916927\"}")
+                .addSink(
+                        DorisSink.sink(
+                                DorisExecutionOptions.builder()
+                                        .setBatchSize(3)
+                                        .setBatchIntervalMs(0l)
+                                        .setMaxRetries(3)
+                                        .setStreamLoadProp(pro).build(),
+                                DorisOptions.builder()
+                                        .setFenodes("FE_IP:8030")
+                                        .setTableIdentifier("db.table")
+                                        .setUsername("root")
+                                        .setPassword("").build()
+                        ));
+        env.execute("doris stream sink example");
+    }
+
+
+    public void testJsonStringWithDefaultReadOptionsAndExecutionOptions() 
throws Exception {
+        /*
+         * Example for JsonString element with default ReadOptions and 
ExecutionOptions
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        env.fromElements("{\"longitude\": \"116.405419\", \"city\": \"北京\", 
\"latitude\": \"39.916927\"}")
+                .addSink(
+                        DorisSink.sink(
+                                DorisOptions.builder()
+                                        .setFenodes("192.168.52.101:8030")
+                                        
.setTableIdentifier("smarttrip_db.doris_output_format")
+                                        .setUsername("root")
+                                        .setPassword("").build()
+                        ));
+        env.execute("doris stream sink example");
+    }
+
+
+    public void testRowData() throws Exception {
+        /*
+         * Example for RowData element
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<RowData> source = env.fromElements("")
+                .map(new MapFunction<String, RowData>() {
+                    @Override
+                    public RowData map(String value) throws Exception {
+                        GenericRowData genericRowData = new GenericRowData(3);
+                        genericRowData.setField(0, 
StringData.fromString("北京"));
+                        genericRowData.setField(1, 116.405419);
+                        genericRowData.setField(2, 39.916927);
+                        return genericRowData;
+                    }
+                });
+
+        String[] fields = {"city", "longitude", "latitude"};
+        LogicalType[] types = {new VarCharType(), new DoubleType(), new 
DoubleType()};
+
+        source.addSink(
+                DorisSink.sink(
+                        fields,
+                        types,
+                        DorisReadOptions.builder().build(),
+                        DorisExecutionOptions.builder()
+                                .setBatchSize(3)
+                                .setBatchIntervalMs(0L)
+                                .setMaxRetries(3)
+                                .build(),
+                        DorisOptions.builder()
+                                .setFenodes("FE_IP:8030")
+                                .setTableIdentifier("db.table")
+                                .setUsername("root")
+                                .setPassword("").build()
+                ));
+        env.execute("doris stream sink example");
+    }
+
+
+    public void testRowDataWithDefaultReadOptions() throws Exception {
+        /*
+         * Example for RowData element with default ReadOptions
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<RowData> source = env.fromElements("")
+                .map(new MapFunction<String, RowData>() {
+                    @Override
+                    public RowData map(String value) throws Exception {
+                        GenericRowData genericRowData = new GenericRowData(3);
+                        genericRowData.setField(0, 
StringData.fromString("北京"));
+                        genericRowData.setField(1, 116.405419);
+                        genericRowData.setField(2, 39.916927);
+                        return genericRowData;
+                    }
+                });
+
+        String[] fields = {"city", "longitude", "latitude"};
+        LogicalType[] types = {new VarCharType(), new DoubleType(), new 
DoubleType()};
+
+        source.addSink(
+                DorisSink.sink(
+                        fields,
+                        types,
+                        DorisExecutionOptions.builder()
+                                .setBatchSize(3)
+                                .setBatchIntervalMs(0L)
+                                .setMaxRetries(3)
+                                .build(),
+                        DorisOptions.builder()
+                                .setFenodes("FE_IP:8030")
+                                .setTableIdentifier("db.table")
+                                .setUsername("root")
+                                .setPassword("").build()
+                ));
+        env.execute("doris stream sink example");
+    }
+
+
+    public void testRowDataWithDefaultReadOptionsAndExecutionOptions() throws 
Exception {
+        /*
+         * Example for RowData element with default ReadOptions and 
ExecutionOptions
+         */
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        DataStream<RowData> source = env.fromElements("")
+                .map(new MapFunction<String, RowData>() {
+                    @Override
+                    public RowData map(String value) throws Exception {
+                        GenericRowData genericRowData = new GenericRowData(3);
+                        genericRowData.setField(0, 
StringData.fromString("北京"));
+                        genericRowData.setField(1, 116.405419);
+                        genericRowData.setField(2, 39.916927);
+                        return genericRowData;
+                    }
+                });
+
+        String[] fields = {"city", "longitude", "latitude"};
+        LogicalType[] types = {new VarCharType(), new DoubleType(), new 
DoubleType()};
+
+        source.addSink(
+                DorisSink.sink(
+                        fields,
+                        types,
+                        DorisOptions.builder()
+                                .setFenodes("FE_IP:8030")
+                                .setTableIdentifier("db.table")
+                                .setUsername("root")
+                                .setPassword("").build()
+                ));
+        env.execute("doris stream sink example");
+    }
+}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to