This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new f5479fa80e [Sample] Add Flink Connector Sample Code (#11120) f5479fa80e is described below commit f5479fa80eb341e15728605143ac321618df7ab0 Author: wudi <676366...@qq.com> AuthorDate: Tue Jul 26 18:00:51 2022 +0800 [Sample] Add Flink Connector Sample Code (#11120) Co-authored-by: wudi <> --- samples/doris-demo/flink-demo-v1.1/pom.xml | 1 + .../apache/doris/demo/flink/Cdc2DorisSQLDemo.java | 68 ++++++++++++++++++++++ .../demo/flink/DorisFlinkConnectorDemoV1.java | 2 +- ...rDemoV1.java => Kafka2DorisDataStreamDemo.java} | 62 ++++++++++---------- samples/doris-demo/spark-demo/pom.xml | 4 +- 5 files changed, 103 insertions(+), 34 deletions(-) diff --git a/samples/doris-demo/flink-demo-v1.1/pom.xml b/samples/doris-demo/flink-demo-v1.1/pom.xml index 5e907fb34b..803a75d760 100644 --- a/samples/doris-demo/flink-demo-v1.1/pom.xml +++ b/samples/doris-demo/flink-demo-v1.1/pom.xml @@ -34,6 +34,7 @@ under the License. <fastjson.version>1.2.62</fastjson.version> <hadoop.version>2.8.3</hadoop.version> <scope.mode>compile</scope.mode> + <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <dependency> diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Cdc2DorisSQLDemo.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Cdc2DorisSQLDemo.java new file mode 100644 index 0000000000..0d1f61fa3e --- /dev/null +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Cdc2DorisSQLDemo.java @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.demo.flink; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; + +public class Cdc2DorisSQLDemo { + + public static void main(String[] args) throws Exception { + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(10000); + env.setParallelism(1); + final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + // register a table in the catalog + tEnv.executeSql( + "CREATE TABLE cdc_test_source (\n" + + " id INT,\n" + + " name STRING\n" + + ") WITH (\n" + + " 'connector' = 'mysql-cdc',\n" + + " 'hostname' = '127.0.0.1',\n" + + " 'port' = '3306',\n" + + " 'username' = 'root',\n" + + " 'password' = '',\n" + + " 'database-name' = 'db',\n" + + " 'table-name' = 'test_source'\n" + + ")"); + //doris table + tEnv.executeSql( + "CREATE TABLE doris_test_sink (" + + "id INT," + + "name STRING" + + ") " + + "WITH (\n" + + " 'connector' = 'doris',\n" + + " 'fenodes' = '127.0.0.1:8030',\n" + + " 'table.identifier' = 'db.test_sink',\n" + + " 'username' = 'root',\n" + + " 'password' = '',\n" + + /* doris stream load label, In the exactly-once scenario, + the label is globally unique and must be restarted from the latest checkpoint when restarting. + Exactly-once semantics can be turned off via sink.enable-2pc. */ + " 'sink.label-prefix' = 'doris_label',\n" + + " 'sink.properties.format' = 'json',\n" + //json data format + " 'sink.properties.read_json_by_line' = 'true'\n" + + ")"); + + //insert into mysql table to doris table + tEnv.executeSql("INSERT INTO doris_test_sink select id,name from cdc_test_source"); + env.execute(); + } +} diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java index 1fd14bf6c9..cebe77f5e2 100644 --- a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java @@ -61,7 +61,7 @@ public class DorisFlinkConnectorDemoV1 { DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); executionBuilder .setStreamLoadProp(pro) - .setLabelPrefix("doris_test") + .setLabelPrefix("doris_test"); builder.setDorisReadOptions(readOptionBuilder.build()) diff --git a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Kafka2DorisDataStreamDemo.java similarity index 54% copy from samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java copy to samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Kafka2DorisDataStreamDemo.java index 1fd14bf6c9..d631ff43e7 100644 --- a/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/DorisFlinkConnectorDemoV1.java +++ b/samples/doris-demo/flink-demo-v1.1/src/main/java/org/apache/doris/demo/flink/Kafka2DorisDataStreamDemo.java @@ -16,62 +16,62 @@ // under the License. package org.apache.doris.demo.flink; - import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.sink.DorisSink; import org.apache.doris.flink.sink.writer.SimpleStringSerializer; -import org.apache.flink.api.common.RuntimeExecutionMode; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.streaming.api.TimeCharacteristic; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; -/** - * Flink doris connector 1.1 demo sample - */ -public class DorisFlinkConnectorDemoV1 { +public class Kafka2DorisDataStreamDemo { public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Properties props = new Properties(); + props.put("bootstrap.servers", "127.0.0.1:9092"); + props.put("group.id", "group"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("auto.offset.reset", "earliest"); - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - env.setRuntimeMode(RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(10000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); - env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000))); + //source config + FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>("test-topic",new SimpleStringSchema(),props); + //sink config DorisSink.Builder<String> builder = DorisSink.builder(); - final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder(); - Properties pro = new Properties(); - pro.setProperty("format", "json"); - pro.setProperty("read_json_by_line", "true"); - pro.setProperty("line_delimiter", "\n"); DorisOptions.Builder dorisBuilder = DorisOptions.builder(); - dorisBuilder.setFenodes("FE_IP:8030") - .setTableIdentifier("test.test_flink") + dorisBuilder.setFenodes("127.0.0.1:8030") + .setTableIdentifier("db.table") .setUsername("root") - .setPassword(""); - DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder(); - executionBuilder - .setStreamLoadProp(pro) - .setLabelPrefix("doris_test") + .setPassword("password"); + Properties pro = new Properties(); + //json data format + pro.setProperty("format", "json"); + pro.setProperty("read_json_by_line", "true"); + DorisExecutionOptions executionOptions = DorisExecutionOptions.builder() + .setLabelPrefix("label-doris") //streamload label prefix, + .setStreamLoadProp(pro).build(); - builder.setDorisReadOptions(readOptionBuilder.build()) - .setDorisExecutionOptions(executionBuilder.build()) - .setSerializer(new SimpleStringSerializer()) + builder.setDorisReadOptions(DorisReadOptions.builder().build()) + .setDorisExecutionOptions(executionOptions) + .setSerializer(new SimpleStringSerializer()) //serialize according to string .setDorisOptions(dorisBuilder.build()); - env.fromElements("{\"id\": \"1\",\"name\": \"wangwu1\", \"age\": \"30\"}\n{\"id\": \"2\",\"name\": \"wangwu4\", \"age\": \"30\"}\n{\"id\": \"3\",\"name\": \"wangwu2\", \"age\": \"30\"}\n{\"id\": \"4\",\"name\": \"doris\", \"age\": \"30\"}\n{\"id\": \"5\",\"name\": \"doris1\", \"age\": \"30\"}\n{\"id\": \"6\",\"name\": \"doris2\", \"age\": \"30\"}").sinkTo(builder.build()); - - env.execute("flink demo"); + //build stream + DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer); + dataStreamSource.sinkTo(builder.build()); + env.execute("flink kafka to doris by datastream"); } } diff --git a/samples/doris-demo/spark-demo/pom.xml b/samples/doris-demo/spark-demo/pom.xml index 43cfe9da8a..f10211d327 100644 --- a/samples/doris-demo/spark-demo/pom.xml +++ b/samples/doris-demo/spark-demo/pom.xml @@ -37,8 +37,8 @@ under the License. <!-- doris spark --> <dependency> <groupId>org.apache.doris</groupId> - <artifactId>doris-spark</artifactId> - <version>1.0.0-SNAPSHOT</version> + <artifactId>spark-doris-connector-2.3_2.11</artifactId> + <version>1.0.1</version> </dependency> <!-- spark --> <dependency> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org