This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new cb7eb725fe Optimize flink oracle cdc, add flink read es to doris sample code (#11274) cb7eb725fe is described below commit cb7eb725fe2fb197200a1bcc19fd180f566af907 Author: caoliang-web <71004656+caoliang-...@users.noreply.github.com> AuthorDate: Fri Jul 29 18:11:18 2022 +0800 Optimize flink oracle cdc, add flink read es to doris sample code (#11274) --- samples/doris-demo/flink-demo/pom.xml | 16 ++ .../doris/demo/flink/cdc/FlinkOracleCdcDemo.java | 4 - .../flink/elasticsearch/ElasticsearchInput.java | 248 +++++++++++++++++++++ .../flink/elasticsearch/FlinkReadEs2Doris.java | 100 +++++++++ 4 files changed, 364 insertions(+), 4 deletions(-) diff --git a/samples/doris-demo/flink-demo/pom.xml b/samples/doris-demo/flink-demo/pom.xml index 0f662acc41..586a86c68d 100644 --- a/samples/doris-demo/flink-demo/pom.xml +++ b/samples/doris-demo/flink-demo/pom.xml @@ -118,6 +118,22 @@ under the License. <artifactId>flink-connector-oracle-cdc</artifactId> <version>2.1.1</version> </dependency> + <!-- elasticsearch --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId> + <version>${flink.version}</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + <version>4.5.13</version> + </dependency> + <dependency> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + <version>4.4.12</version> + </dependency> </dependencies> <build> <plugins> diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java index 5a1045a029..92350a14f3 100644 --- a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java +++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/cdc/FlinkOracleCdcDemo.java @@ -104,10 +104,6 @@ public class FlinkOracleCdcDemo { LogicalType[] types={new IntType(),new VarCharType(),new VarCharType(), new DoubleType()}; - Properties pro = new Properties(); - pro.setProperty("format", "json"); - pro.setProperty("strip_outer_array", "false"); - map.addSink( DorisSink.sink( fields, diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java new file mode 100644 index 0000000000..33c9567b53 --- /dev/null +++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/ElasticsearchInput.java @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.demo.flink.elasticsearch; + +import org.apache.commons.collections.map.CaseInsensitiveMap; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.RichInputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; +import org.apache.http.HttpHost; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.client.RequestOptions; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientBuilder; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.Scroll; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * custom elasticsearch source + */ + +public class ElasticsearchInput extends RichInputFormat<Row, InputSplit> implements ResultTypeQueryable<Row> { + + private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchInput.class); + + private final List<HttpHost> httpHosts; + private final RestClientFactory restClientFactory; + private final String index; + + private transient RestHighLevelClient client; + + private String scrollId; + private SearchRequest searchRequest; + + private RowTypeInfo rowTypeInfo; + + private boolean hasNext; + + private Iterator<Map<String, Object>> iterator; + private Map<String, Integer> position; + + + public ElasticsearchInput(List<HttpHost> httpHosts, + RestClientFactory restClientFactory, String index) { + this.httpHosts = httpHosts; + this.restClientFactory = restClientFactory; + this.index = index; + } + + @Override + public void configure(Configuration parameters) { + + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { + return cachedStatistics; + } + + @Override + public InputSplit[] createInputSplits(int minNumSplits) throws IOException { + return new GenericInputSplit[]{new GenericInputSplit(0, 1)}; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(InputSplit split) throws IOException { + search(); + } + + // get data from es + protected void search() throws IOException{ + SearchResponse searchResponse; + if(scrollId == null){ + searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); + scrollId = searchResponse.getScrollId(); + }else{ + searchResponse = client.searchScroll(new SearchScrollRequest(scrollId),RequestOptions.DEFAULT); + } + + if(searchResponse == null || searchResponse.getHits().getTotalHits().value < 1){ + hasNext = false; + return; + } + + hasNext = true; + iterator = Arrays.stream(searchResponse.getHits().getHits()) + .map(t -> t.getSourceAsMap()) + .collect(Collectors.toList()).iterator(); + } + + + + @Override + public void openInputFormat() throws IOException { + RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[httpHosts.size()])); + restClientFactory.configureRestClientBuilder(builder); + client = new RestHighLevelClient(builder); + + position = new CaseInsensitiveMap(); + int i = 0; + for(String name : rowTypeInfo.getFieldNames()){ + position.put(name, i++); + } + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(QueryBuilders.matchAllQuery()); + //The test data is relatively small, so the commonly used ones here are relatively small + searchSourceBuilder.size(50); + + searchSourceBuilder.fetchSource(rowTypeInfo.getFieldNames(), null); + + // Get data using scroll api + Scroll scroll = new Scroll(TimeValue.timeValueMinutes(3l)); + + searchRequest = new SearchRequest(); + searchRequest.indices(index); + searchRequest.scroll(scroll); + searchRequest.source(searchSourceBuilder); + } + + @Override + public boolean reachedEnd() throws IOException { + return !hasNext; + } + + @Override + public Row nextRecord(Row reuse) throws IOException { + if(!hasNext) return null; + + if(!iterator.hasNext()){ + this.search(); + if(!hasNext || !iterator.hasNext()){ + hasNext = false; + return null; + } + } + + for(Map.Entry<String, Object> entry: iterator.next().entrySet()){ + Integer p = position.get(entry.getKey()); + if(p == null) throw new IOException("unknown field "+entry.getKey()); + + reuse.setField(p, entry.getValue()); + } + + return reuse; + } + + @Override + public void close() throws IOException { + if(client == null) + return; + + iterator = null; + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + try { + client.clearScroll(clearScrollRequest,RequestOptions.DEFAULT).isSucceeded(); + client.close(); + client = null; + } catch (Exception exception) { + if(!(exception.getMessage()).contains("Unable to parse response body")){ + throw exception; + } + } + } + + @Override + public TypeInformation<Row> getProducedType() { + return rowTypeInfo; + } + + public static Builder builder(List<HttpHost> httpHosts, String index){ + return new Builder(httpHosts, index); + } + + @PublicEvolving + public static class Builder { + private final List<HttpHost> httpHosts; + private String index; + private RowTypeInfo rowTypeInfo; + private RestClientFactory restClientFactory = restClientBuilder -> { + }; + + public Builder(List<HttpHost> httpHosts, String index) { + this.httpHosts = Preconditions.checkNotNull(httpHosts); + this.index = index; + } + + + public Builder setRowTypeInfo(RowTypeInfo rowTypeInfo) { + this.rowTypeInfo = rowTypeInfo; + return this; + } + + + public ElasticsearchInput build() { + Preconditions.checkNotNull(this.rowTypeInfo); + ElasticsearchInput input = new ElasticsearchInput(httpHosts, restClientFactory, index); + input.rowTypeInfo = this.rowTypeInfo; + return input; + } + + } +} diff --git a/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java new file mode 100644 index 0000000000..4d47fd9b19 --- /dev/null +++ b/samples/doris-demo/flink-demo/src/main/java/org/apache/doris/demo/flink/elasticsearch/FlinkReadEs2Doris.java @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.demo.flink.elasticsearch; + + +import com.alibaba.fastjson.JSONObject; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.cfg.DorisSink; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.types.Row; +import org.apache.http.HttpHost; + +import java.util.Properties; + + +/** + * flink reads elasticsearch data and synchronizes it to doris (flink-doris-connector method) + */ +public class FlinkReadEs2Doris { + + private static final String host = "127.0.0.1"; + private static final String index = "commodity"; + + + public static void main(String[] args) throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + RowTypeInfo rowTypeInfo = new RowTypeInfo( + new TypeInformation[]{Types.INT, Types.STRING, Types.STRING,Types.STRING,Types.DOUBLE}, + new String[]{"commodity_id", "commodity_name", "create_time","picture_url","price"}); + + ElasticsearchInput es = ElasticsearchInput.builder( + Lists.newArrayList(new HttpHost(host, 9200)), + index) + .setRowTypeInfo(rowTypeInfo) + .build(); + + DataStreamSource<Row> source = env.createInput(es); + + SingleOutputStreamOperator<String> map = source.map(new MapFunction<Row, String>() { + @Override + public String map(Row row) throws Exception { + JSONObject jsonObject = new JSONObject(); + jsonObject.put("commodity_id", row.getField(0)); + jsonObject.put("commodity_name", row.getField(1)); + jsonObject.put("create_time", row.getField(2)); + jsonObject.put("picture_url", row.getField(3)); + jsonObject.put("price", row.getField(4)); + return jsonObject.toJSONString(); + } + }); + + Properties pro = new Properties(); + pro.setProperty("format", "json"); + pro.setProperty("strip_outer_array", "true"); + + map.addSink( + DorisSink.sink( + DorisReadOptions.builder().build(), + DorisExecutionOptions.builder() + .setMaxRetries(3) + .setStreamLoadProp(pro).build(), + DorisOptions.builder() + .setFenodes("127.0.0.1:8030") + .setTableIdentifier("test.test_es") + .setUsername("root") + .setPassword("").build() + )); + + env.execute("test_es"); + } + + + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org