This is an automated email from the ASF dual-hosted git repository. jiafengzheng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7171929 [Feature] Support Flink 1.15 (#49) 7171929 is described below commit 7171929fb4ab64d96e75ad3f58752e64a1a75f11 Author: wudi <676366...@qq.com> AuthorDate: Thu Jul 21 19:54:36 2022 +0800 [Feature] Support Flink 1.15 (#49) * flink 1.15 support --- .github/workflows/build-extension.yml | 4 +- flink-doris-connector/pom.xml | 134 ++---------- .../apache/doris/flink/backend/BackendClient.java | 9 +- .../doris/flink/cfg/DorisExecutionOptions.java | 2 +- .../flink/datastream/DorisSourceFunction.java | 14 +- .../flink/exception/ConnectedFailedException.java | 2 +- .../flink/exception/DorisInternalException.java | 2 +- .../exception/ShouldNeverHappenException.java | 2 +- .../org/apache/doris/flink/rest/RestService.java | 12 +- .../apache/doris/flink/serialization/RowBatch.java | 7 +- .../source/reader/DorisSourceSplitReader.java | 25 ++- .../flink/source/reader/DorisValueReader.java | 229 +++++++++++++++++++++ .../flink/source/split/DorisSplitRecords.java | 18 +- .../flink/table/DorisDynamicTableFactory.java | 2 +- .../doris/flink/table/DorisRowDataInputFormat.java | 33 +-- .../doris/flink/datastream/ScalaValueReader.scala | 222 -------------------- .../apache/doris/flink/DorisSourceSinkExample.java | 1 - .../doris/flink/serialization/TestRowBatch.java | 17 +- 18 files changed, 310 insertions(+), 425 deletions(-) diff --git a/.github/workflows/build-extension.yml b/.github/workflows/build-extension.yml index b2ad6af..bc0c47c 100644 --- a/.github/workflows/build-extension.yml +++ b/.github/workflows/build-extension.yml @@ -46,7 +46,7 @@ jobs: touch custom_env.sh echo 'export THRIFT_BIN=/usr/bin/thrift' >> custom_env.sh - - name: Build flink connector 1.14 + - name: Build flink connector 1.15 run: | - cd flink-doris-connector/ && /bin/bash build.sh --flink 1.14.3 --scala 2.12 + cd flink-doris-connector/ && /bin/bash build.sh --flink 1.15.0 diff --git a/flink-doris-connector/pom.xml b/flink-doris-connector/pom.xml index ddcd3de..27b6df1 100644 --- a/flink-doris-connector/pom.xml +++ b/flink-doris-connector/pom.xml @@ -26,7 +26,7 @@ under the License. <version>23</version> </parent> <groupId>org.apache.doris</groupId> - <artifactId>flink-doris-connector-${flink.minor.version}_${scala.version}</artifactId> + <artifactId>flink-doris-connector-${flink.minor.version}</artifactId> <version>1.0.0-SNAPSHOT</version> <name>Flink Doris Connector</name> <url>https://doris.apache.org/</url> @@ -62,7 +62,6 @@ under the License. </mailingList> </mailingLists> <properties> - <scala.version>${env.scala.version}</scala.version> <flink.version>${env.flink.version}</flink.version> <flink.minor.version>${env.flink.minor.version}</flink.minor.version> <libthrift.version>0.13.0</libthrift.version> @@ -111,16 +110,7 @@ under the License. <profile> <id>flink.version</id> <properties> - <env.flink.version>1.14.3</env.flink.version> - </properties> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - </profile> - <profile> - <id>scala.version</id> - <properties> - <env.scala.version>2.12</env.scala.version> + <env.flink.version>1.15.0</env.flink.version> </properties> <activation> <activeByDefault>true</activeByDefault> @@ -147,26 +137,31 @@ under the License. <dependencies> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-java</artifactId> + <artifactId>flink-clients</artifactId> + <version>${flink.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-streaming-java_${scala.version}</artifactId> + <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-clients_${scala.version}</artifactId> + <artifactId>flink-table-api-java-bridge</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> - <!-- flink table --> <dependency> <groupId>org.apache.flink</groupId> - <artifactId>flink-table-planner_${scala.version}</artifactId> + <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> @@ -235,6 +230,7 @@ under the License. <artifactId>netty-common</artifactId> <version>4.1.77.Final</version> </dependency> + <!-- jackson --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> @@ -250,35 +246,6 @@ under the License. <artifactId>jackson-databind</artifactId> <version>2.13.3</version> </dependency> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-web</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-api</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-core</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-slf4j-impl --> - <dependency> - <groupId>org.apache.logging.log4j</groupId> - <artifactId>log4j-slf4j-impl</artifactId> - <version>${log4j2.version}</version> - </dependency> - <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>1.7.9</version> - </dependency> <!--Test--> <dependency> <groupId>org.hamcrest</groupId> @@ -288,26 +255,14 @@ under the License. </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-scala_${scala.version}</artifactId> - <version>1.4.7</version> - <exclusions> - <exclusion> - <artifactId>hamcrest-core</artifactId> - <groupId>org.hamcrest</groupId> - </exclusion> - </exclusions> + <artifactId>mockito-core</artifactId> + <version>2.27.0</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> - <exclusions> - <exclusion> - <artifactId>hamcrest-core</artifactId> - <groupId>org.hamcrest</groupId> - </exclusion> - </exclusions> <scope>test</scope> </dependency> </dependencies> @@ -344,32 +299,6 @@ under the License. </execution> </executions> </plugin> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - <version>3.2.1</version> - <executions> - <execution> - <id>scala-compile-first</id> - <phase>process-resources</phase> - <goals> - <goal>compile</goal> - </goals> - </execution> - <execution> - <id>scala-test-compile</id> - <phase>process-test-resources</phase> - <goals> - <goal>testCompile</goal> - </goals> - </execution> - </executions> - <configuration> - <args> - <arg>-feature</arg> - </args> - </configuration> - </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> @@ -417,39 +346,6 @@ under the License. </execution> </executions> </plugin> - <!-- - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - <version>0.7.8</version> - <configuration> - <excludes> - <exclude>**/thrift/**</exclude> - </excludes> - </configuration> - <executions> - <execution> - <id>prepare-agent</id> - <goals> - <goal>prepare-agent</goal> - </goals> - </execution> - <execution> - <id>check</id> - <goals> - <goal>check</goal> - </goals> - </execution> - <execution> - <id>report</id> - <phase>test</phase> - <goals> - <goal>report</goal> - </goals> - </execution> - </executions> - </plugin> - --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 9b8d955..f55d148 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -20,7 +20,6 @@ package org.apache.doris.flink.backend; import org.apache.doris.flink.cfg.ConfigurationOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.ConnectedFailedException; -import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.DorisInternalException; import org.apache.doris.flink.serialization.Routing; import org.apache.doris.flink.util.ErrorMessages; @@ -57,7 +56,7 @@ public class BackendClient { private final int socketTimeout; private final int connectTimeout; - public BackendClient(Routing routing, DorisReadOptions readOptions) throws ConnectedFailedException { + public BackendClient(Routing routing, DorisReadOptions readOptions) { this.routing = routing; this.connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT : readOptions.getRequestConnectTimeoutMs(); this.socketTimeout = readOptions.getRequestReadTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_READ_TIMEOUT_MS_DEFAULT : readOptions.getRequestReadTimeoutMs(); @@ -67,7 +66,7 @@ public class BackendClient { open(); } - private void open() throws ConnectedFailedException { + private void open() { logger.debug("Open client to Doris BE '{}'.", routing); TException ex = null; for (int attempt = 0; !isConnected && attempt < retries; ++attempt) { @@ -117,7 +116,7 @@ public class BackendClient { * @return scan open result * @throws ConnectedFailedException throw if cannot connect to Doris BE */ - public TScanOpenResult openScanner(TScanOpenParams openParams) throws ConnectedFailedException { + public TScanOpenResult openScanner(TScanOpenParams openParams) { logger.debug("OpenScanner to '{}', parameter is '{}'.", routing, openParams); if (!isConnected) { open(); @@ -153,7 +152,7 @@ public class BackendClient { * @return scan batch result * @throws ConnectedFailedException throw if cannot connect to Doris BE */ - public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) throws DorisException { + public TScanBatchResult getNext(TScanNextBatchParams nextBatchParams) { logger.debug("GetNext to '{}', parameter is '{}'.", routing, nextBatchParams); if (!isConnected) { open(); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java index 2daf5e1..102a7ee 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/cfg/DorisExecutionOptions.java @@ -73,7 +73,7 @@ public class DorisExecutionOptions implements Serializable { public static DorisExecutionOptions defaults() { Properties properties = new Properties(); properties.setProperty("format", "json"); - properties.setProperty("strip_outer_array", "true"); + properties.setProperty("read_json_by_line", "true"); return new Builder().setStreamLoadProp(properties).build(); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java index 1957139..fc5fd14 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/datastream/DorisSourceFunction.java @@ -23,18 +23,18 @@ import org.apache.doris.flink.deserialization.DorisDeserializationSchema; import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.rest.PartitionDefinition; import org.apache.doris.flink.rest.RestService; +import org.apache.doris.flink.source.reader.DorisValueReader; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; - /** * DorisSource **/ @@ -48,7 +48,7 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp private final DorisReadOptions readOptions; private transient volatile boolean isRunning; private List<PartitionDefinition> dorisPartitions; - private List<PartitionDefinition> taskDorisPartitions = Lists.newArrayList(); + private List<PartitionDefinition> taskDorisPartitions = new ArrayList<>(); public DorisSourceFunction(DorisStreamOptions streamOptions, DorisDeserializationSchema<List<?>> deserializer) { this.deserializer = deserializer; @@ -87,11 +87,13 @@ public class DorisSourceFunction extends RichParallelSourceFunction<List<?>> imp @Override public void run(SourceContext<List<?>> sourceContext) { for (PartitionDefinition partitions : taskDorisPartitions) { - try (ScalaValueReader scalaValueReader = new ScalaValueReader(partitions, options, readOptions)) { - while (isRunning && scalaValueReader.hasNext()) { - List<?> next = scalaValueReader.next(); + try (DorisValueReader valueReader = new DorisValueReader(partitions, options, readOptions)) { + while (isRunning && valueReader.hasNext()) { + List<?> next = valueReader.next(); sourceContext.collect(next); } + } catch (Exception e) { + logger.error("close reader resource failed,", e); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java index e25d1a5..6f755b7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ConnectedFailedException.java @@ -17,7 +17,7 @@ package org.apache.doris.flink.exception; -public class ConnectedFailedException extends DorisException { +public class ConnectedFailedException extends DorisRuntimeException { public ConnectedFailedException(String server, Throwable cause) { super("Connect to " + server + "failed.", cause); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java index eadd860..e6756a4 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/DorisInternalException.java @@ -21,7 +21,7 @@ import org.apache.doris.thrift.TStatusCode; import java.util.List; -public class DorisInternalException extends DorisException { +public class DorisInternalException extends DorisRuntimeException { public DorisInternalException(String server, TStatusCode statusCode, List<String> errorMsgs) { super("Doris server " + server + " internal failed, status code [" + statusCode + "] error message is " + errorMsgs); } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java index a26718d..81af673 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/exception/ShouldNeverHappenException.java @@ -17,5 +17,5 @@ package org.apache.doris.flink.exception; -public class ShouldNeverHappenException extends DorisException { +public class ShouldNeverHappenException extends DorisRuntimeException { } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 734bfdb..ff03e01 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -20,16 +20,15 @@ package org.apache.doris.flink.rest; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; - import org.apache.commons.io.IOUtils; -import org.apache.doris.flink.cfg.DorisOptions; -import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.exception.DorisRuntimeException; -import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.ConfigurationOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.ConnectedFailedException; import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.exception.IllegalArgumentException; import org.apache.doris.flink.exception.ShouldNeverHappenException; import org.apache.doris.flink.rest.models.Backend; import org.apache.doris.flink.rest.models.BackendRow; @@ -37,14 +36,13 @@ import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.QueryPlan; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.rest.models.Tablet; -import org.apache.flink.calcite.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.http.HttpStatus; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpRequestBase; import org.apache.http.entity.StringEntity; - import org.slf4j.Logger; import java.io.BufferedReader; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index 4dd6732..e83300a 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -33,6 +33,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; +import org.apache.doris.flink.exception.DorisRuntimeException; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.thrift.TScanBatchResult; import org.apache.flink.util.Preconditions; @@ -99,7 +100,7 @@ public class RowBatch { this.offsetInRowBatch = 0; } - public RowBatch readArrow() throws DorisException { + public RowBatch readArrow() { try { this.root = arrowStreamReader.getVectorSchemaRoot(); while (arrowStreamReader.loadNextBatch()) { @@ -124,7 +125,7 @@ public class RowBatch { return this; } catch (Exception e) { logger.error("Read Doris Data failed because: ", e); - throw new DorisException(e.getMessage()); + throw new DorisRuntimeException(e.getMessage()); } finally { close(); } @@ -304,7 +305,7 @@ public class RowBatch { } } - public List<Object> next() throws DorisException { + public List<Object> next() { if (!hasNext()) { String errMsg = "Get row offset:" + offsetInRowBatch + " larger than row size: " + readRowCount; logger.error(errMsg); diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java index aadf6fd..c5d33f7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisSourceSplitReader.java @@ -18,7 +18,6 @@ package org.apache.doris.flink.source.reader; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.datastream.ScalaValueReader; import org.apache.doris.flink.source.split.DorisSourceSplit; import org.apache.doris.flink.source.split.DorisSplitRecords; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; @@ -43,7 +42,7 @@ public class DorisSourceSplitReader private final Queue<DorisSourceSplit> splits; private final DorisOptions options; private final DorisReadOptions readOptions; - private ScalaValueReader scalaValueReader; + private DorisValueReader valueReader; private String currentSplitId; public DorisSourceSplitReader(DorisOptions options, DorisReadOptions readOptions) { @@ -56,14 +55,14 @@ public class DorisSourceSplitReader public RecordsWithSplitIds<List> fetch() throws IOException { checkSplitOrStartNext(); - if (!scalaValueReader.hasNext()) { + if (!valueReader.hasNext()) { return finishSplit(); } - return DorisSplitRecords.forRecords(currentSplitId, scalaValueReader); + return DorisSplitRecords.forRecords(currentSplitId, valueReader); } private void checkSplitOrStartNext() throws IOException { - if (scalaValueReader != null) { + if (valueReader != null) { return; } final DorisSourceSplit nextSplit = splits.poll(); @@ -71,13 +70,17 @@ public class DorisSourceSplitReader throw new IOException("Cannot fetch from another split - no split remaining"); } currentSplitId = nextSplit.splitId(); - scalaValueReader = new ScalaValueReader(nextSplit.getPartitionDefinition(), options, readOptions); + valueReader = new DorisValueReader(nextSplit.getPartitionDefinition(), options, readOptions); } private DorisSplitRecords finishSplit() { - if (scalaValueReader != null) { - scalaValueReader.close(); - scalaValueReader = null; + if (valueReader != null) { + try { + valueReader.close(); + } catch (Exception e) { + LOG.error("close resource reader failed,", e); + } + valueReader = null; } final DorisSplitRecords finishRecords = DorisSplitRecords.finishedSplit(currentSplitId); currentSplitId = null; @@ -96,8 +99,8 @@ public class DorisSourceSplitReader @Override public void close() throws Exception { - if (scalaValueReader != null) { - scalaValueReader.close(); + if (valueReader != null) { + valueReader.close(); } } } diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java new file mode 100644 index 0000000..173ea90 --- /dev/null +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/reader/DorisValueReader.java @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.source.reader; + +import org.apache.doris.flink.backend.BackendClient; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.exception.IllegalArgumentException; +import org.apache.doris.flink.exception.ShouldNeverHappenException; +import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.rest.SchemaUtils; +import org.apache.doris.flink.rest.models.Schema; +import org.apache.doris.flink.serialization.Routing; +import org.apache.doris.flink.serialization.RowBatch; +import org.apache.doris.thrift.TScanBatchResult; +import org.apache.doris.thrift.TScanCloseParams; +import org.apache.doris.thrift.TScanNextBatchParams; +import org.apache.doris.thrift.TScanOpenParams; +import org.apache.doris.thrift.TScanOpenResult; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_BATCH_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DEFAULT_CLUSTER; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_EXEC_MEM_LIMIT_DEFAULT; +import static org.apache.doris.flink.cfg.ConfigurationOptions.DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT; +import static org.apache.doris.flink.util.ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE; + +public class DorisValueReader implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(DorisValueReader.class); + protected BackendClient client; + private PartitionDefinition partition; + private DorisOptions options; + private DorisReadOptions readOptions; + + protected int offset = 0; + protected AtomicBoolean eos = new AtomicBoolean(false); + protected RowBatch rowBatch; + + // flag indicate if support deserialize Arrow to RowBatch asynchronously + protected Boolean deserializeArrowToRowBatchAsync; + + protected BlockingQueue<RowBatch> rowBatchBlockingQueue; + private TScanOpenParams openParams; + protected String contextId; + protected Schema schema; + protected boolean asyncThreadStarted; + + public DorisValueReader(PartitionDefinition partition, DorisOptions options, DorisReadOptions readOptions) { + this.partition = partition; + this.options = options; + this.readOptions = readOptions; + this.client = backendClient(); + this.deserializeArrowToRowBatchAsync = readOptions.getDeserializeArrowAsync() == null ? DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT : readOptions.getDeserializeArrowAsync(); + + Integer blockingQueueSize = readOptions.getDeserializeQueueSize() == null ? DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT : readOptions.getDeserializeQueueSize(); + if (this.deserializeArrowToRowBatchAsync) { + this.rowBatchBlockingQueue = new ArrayBlockingQueue(blockingQueueSize); + } + init(); + } + + private void init() { + this.openParams = openParams(); + TScanOpenResult openResult = this.client.openScanner(this.openParams); + this.contextId = openResult.getContextId(); + this.schema = SchemaUtils.convertToSchema(openResult.getSelectedColumns()); + this.asyncThreadStarted = asyncThreadStarted(); + LOG.debug("Open scan result is, contextId: {}, schema: {}.", contextId, schema); + } + + private BackendClient backendClient() { + try { + return new BackendClient(new Routing(partition.getBeAddress()), readOptions); + } catch (IllegalArgumentException e) { + LOG.error("init backend:{} client failed,", partition.getBeAddress(), e); + throw new DorisRuntimeException(e); + } + } + + private TScanOpenParams openParams() { + TScanOpenParams params = new TScanOpenParams(); + params.cluster = DORIS_DEFAULT_CLUSTER; + params.database = partition.getDatabase(); + params.table = partition.getTable(); + + params.tablet_ids = Arrays.asList(partition.getTabletIds().toArray(new Long[]{})); + params.opaqued_query_plan = partition.getQueryPlan(); + // max row number of one read batch + Integer batchSize = readOptions.getRequestBatchSize() == null ? DORIS_BATCH_SIZE_DEFAULT : readOptions.getRequestBatchSize(); + Integer queryDorisTimeout = readOptions.getRequestQueryTimeoutS() == null ? DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT : readOptions.getRequestQueryTimeoutS(); + Long execMemLimit = readOptions.getExecMemLimit() == null ? DORIS_EXEC_MEM_LIMIT_DEFAULT : readOptions.getExecMemLimit(); + params.setBatchSize(batchSize); + params.setQueryTimeout(queryDorisTimeout); + params.setMemLimit(execMemLimit); + params.setUser(options.getUsername()); + params.setPasswd(options.getPassword()); + LOG.debug("Open scan params is,cluster:{},database:{},table:{},tabletId:{},batch size:{},query timeout:{},execution memory limit:{},user:{},query plan: {}", + params.getCluster(), params.getDatabase(), params.getTable(), params.getTabletIds(), params.getBatchSize(), params.getQueryTimeout(), params.getMemLimit(), params.getUser(), params.getOpaquedQueryPlan()); + return params; + } + + protected Thread asyncThread = new Thread(new Runnable() { + @Override + public void run() { + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + while (!eos.get()) { + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + RowBatch rowBatch = new RowBatch(nextResult, schema).readArrow(); + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + try { + rowBatchBlockingQueue.put(rowBatch); + } catch (InterruptedException e) { + throw new DorisRuntimeException(e); + } + } + } + } + }); + + protected boolean asyncThreadStarted() { + boolean started = false; + if (deserializeArrowToRowBatchAsync) { + asyncThread.start(); + started = true; + } + return started; + } + + /** + * read data and cached in rowBatch. + * + * @return true if hax next value + */ + public boolean hasNext() { + boolean hasNext = false; + if (deserializeArrowToRowBatchAsync && asyncThreadStarted) { + // support deserialize Arrow to RowBatch asynchronously + if (rowBatch == null || !rowBatch.hasNext()) { + while (!eos.get() || !rowBatchBlockingQueue.isEmpty()) { + if (!rowBatchBlockingQueue.isEmpty()) { + try { + rowBatch = rowBatchBlockingQueue.take(); + } catch (InterruptedException e) { + throw new DorisRuntimeException(e); + } + hasNext = true; + break; + } else { + // wait for rowBatch put in queue or eos change + try { + Thread.sleep(5); + } catch (InterruptedException e) { + } + } + } + } else { + hasNext = true; + } + } else { + // Arrow data was acquired synchronously during the iterative process + if (!eos.get() && (rowBatch == null || !rowBatch.hasNext())) { + if (rowBatch != null) { + offset += rowBatch.getReadRowCount(); + rowBatch.close(); + } + TScanNextBatchParams nextBatchParams = new TScanNextBatchParams(); + nextBatchParams.setContextId(contextId); + nextBatchParams.setOffset(offset); + TScanBatchResult nextResult = client.getNext(nextBatchParams); + eos.set(nextResult.isEos()); + if (!eos.get()) { + rowBatch = new RowBatch(nextResult, schema).readArrow(); + } + } + hasNext = !eos.get(); + } + return hasNext; + } + + /** + * get next value. + * + * @return next value + */ + public List next() { + if (!hasNext()) { + LOG.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + return rowBatch.next(); + } + + @Override + public void close() throws Exception { + TScanCloseParams closeParams = new TScanCloseParams(); + closeParams.setContextId(contextId); + client.closeScanner(closeParams); + } +} diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java index 6f02446..9d1bbd7 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/source/split/DorisSplitRecords.java @@ -16,7 +16,7 @@ // under the License. package org.apache.doris.flink.source.split; -import org.apache.doris.flink.datastream.ScalaValueReader; +import org.apache.doris.flink.source.reader.DorisValueReader; import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds; import javax.annotation.Nullable; @@ -26,25 +26,25 @@ import java.util.Set; /** * An implementation of {@link RecordsWithSplitIds}. - * This is essentially a slim wrapper around the {@link ScalaValueReader} that only adds + * This is essentially a slim wrapper around the {@link DorisValueReader} that only adds * information about the current split, or finished splits */ public class DorisSplitRecords implements RecordsWithSplitIds<List> { private final Set<String> finishedSplits; - private final ScalaValueReader scalaValueReader; + private final DorisValueReader valueReader; private String splitId; public DorisSplitRecords(String splitId, - ScalaValueReader scalaValueReader, + DorisValueReader valueReader, Set<String> finishedSplits) { this.splitId = splitId; - this.scalaValueReader = scalaValueReader; + this.valueReader = valueReader; this.finishedSplits = finishedSplits; } public static DorisSplitRecords forRecords( - final String splitId, final ScalaValueReader valueReader) { + final String splitId, final DorisValueReader valueReader) { return new DorisSplitRecords(splitId, valueReader, Collections.emptySet()); } @@ -58,7 +58,7 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> { // move the split one (from current value to null) final String nextSplit = this.splitId; this.splitId = null; - if (scalaValueReader == null || !scalaValueReader.hasNext()) { + if (valueReader == null || !valueReader.hasNext()) { return null; } return nextSplit; @@ -67,8 +67,8 @@ public class DorisSplitRecords implements RecordsWithSplitIds<List> { @Nullable @Override public List nextRecordFromSplit() { - if (scalaValueReader != null && scalaValueReader.hasNext()) { - List next = scalaValueReader.next(); + if (valueReader != null && valueReader.hasNext()) { + List next = valueReader.next(); return next; } return null; diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java index be00cff..fb44359 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisDynamicTableFactory.java @@ -147,7 +147,7 @@ public final class DorisDynamicTableFactory implements DynamicTableSourceFactory private static final ConfigOption<String> SINK_LABEL_PREFIX = ConfigOptions .key("sink.label-prefix") .stringType() - .noDefaultValue() + .defaultValue("") .withDescription("the unique label prefix."); private static final ConfigOption<Duration> SINK_BUFFER_FLUSH_INTERVAL = ConfigOptions .key("sink.batch.interval") diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java index 16cf2ee..7181ce6 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/table/DorisRowDataInputFormat.java @@ -18,9 +18,9 @@ package org.apache.doris.flink.table; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; -import org.apache.doris.flink.datastream.ScalaValueReader; import org.apache.doris.flink.deserialization.converter.DorisRowConverter; import org.apache.doris.flink.rest.PartitionDefinition; +import org.apache.doris.flink.source.reader.DorisValueReader; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; @@ -30,17 +30,12 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.RowData; -import org.apache.flink.table.data.StringData; -import org.apache.flink.table.types.logical.DecimalType; -import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.math.BigDecimal; import java.sql.PreparedStatement; import java.util.ArrayList; import java.util.List; @@ -59,7 +54,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable private List<PartitionDefinition> dorisPartitions; private TypeInformation<RowData> rowDataTypeInfo; - private ScalaValueReader scalaValueReader; + private DorisValueReader valueReader; private transient boolean hasNext; private final DorisRowConverter rowConverter; @@ -105,8 +100,8 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable */ @Override public void open(DorisTableInputSplit inputSplit) throws IOException { - scalaValueReader = new ScalaValueReader(inputSplit.partition, options, readOptions); - hasNext = scalaValueReader.hasNext(); + valueReader = new DorisValueReader(inputSplit.partition, options, readOptions); + hasNext = valueReader.hasNext(); } /** @@ -147,27 +142,13 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable if (!hasNext) { return null; } - List next = (List) scalaValueReader.next(); + List next = valueReader.next(); RowData genericRowData = rowConverter.convertInternal(next); //update hasNext after we've read the record - hasNext = scalaValueReader.hasNext(); + hasNext = valueReader.hasNext(); return genericRowData; } - private Object deserialize(LogicalType type, Object val) { - switch (type.getTypeRoot()) { - case DECIMAL: - final DecimalType decimalType = ((DecimalType) type); - final int precision = decimalType.getPrecision(); - final int scala = decimalType.getScale(); - return DecimalData.fromBigDecimal((BigDecimal) val, precision, scala); - case VARCHAR: - return StringData.fromString((String) val); - default: - return val; - } - } - @Override public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { return cachedStatistics; @@ -249,7 +230,7 @@ public class DorisRowDataInputFormat extends RichInputFormat<RowData, DorisTable public DorisRowDataInputFormat build() { return new DorisRowDataInputFormat( - optionsBuilder.build(), partitions, readOptions, rowType + optionsBuilder.build(), partitions, readOptions, rowType ); } } diff --git a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala deleted file mode 100644 index 06df2ef..0000000 --- a/flink-doris-connector/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ /dev/null @@ -1,222 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.flink.datastream - -import java.util.concurrent._ -import java.util.concurrent.atomic.AtomicBoolean -import org.apache.doris.flink.backend.BackendClient -import org.apache.doris.flink.cfg.ConfigurationOptions._ -import org.apache.doris.flink.cfg.{DorisOptions, DorisReadOptions} -import org.apache.doris.flink.exception.ShouldNeverHappenException -import org.apache.doris.flink.rest.{PartitionDefinition, SchemaUtils} -import org.apache.doris.flink.rest.models.Schema -import org.apache.doris.flink.serialization.{Routing, RowBatch} -import org.apache.doris.flink.util.ErrorMessages -import org.apache.doris.flink.util.ErrorMessages._ -import org.apache.doris.thrift.{TScanCloseParams, TScanNextBatchParams, TScanOpenParams, TScanOpenResult} -import org.apache.log4j.Logger - -import scala.collection.JavaConversions._ -import scala.util.Try -import scala.util.control.Breaks - -/** - * read data from Doris BE to array. - * @param partition Doris RDD partition - * @param options request configuration - */ -class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, readOptions: DorisReadOptions) extends AutoCloseable { - protected val logger = Logger.getLogger(classOf[ScalaValueReader]) - - protected val client = new BackendClient(new Routing(partition.getBeAddress), readOptions) - protected var offset = 0 - protected var eos: AtomicBoolean = new AtomicBoolean(false) - protected var rowBatch: RowBatch = _ - // flag indicate if support deserialize Arrow to RowBatch asynchronously - protected var deserializeArrowToRowBatchAsync: java.lang.Boolean = Try { - if(readOptions.getDeserializeArrowAsync == null ) DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT else readOptions.getDeserializeArrowAsync - } getOrElse { - logger.warn(ErrorMessages.PARSE_BOOL_FAILED_MESSAGE, DORIS_DESERIALIZE_ARROW_ASYNC, readOptions.getDeserializeArrowAsync) - DORIS_DESERIALIZE_ARROW_ASYNC_DEFAULT - } - - protected var rowBatchBlockingQueue: BlockingQueue[RowBatch] = { - val blockingQueueSize = Try { - if(readOptions.getDeserializeQueueSize == null) DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT else readOptions.getDeserializeQueueSize - } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_DESERIALIZE_QUEUE_SIZE, readOptions.getDeserializeQueueSize) - DORIS_DESERIALIZE_QUEUE_SIZE_DEFAULT - } - - var queue: BlockingQueue[RowBatch] = null - if (deserializeArrowToRowBatchAsync) { - queue = new ArrayBlockingQueue(blockingQueueSize) - } - queue - } - - private val openParams: TScanOpenParams = { - val params = new TScanOpenParams - params.cluster = DORIS_DEFAULT_CLUSTER - params.database = partition.getDatabase - params.table = partition.getTable - - params.tablet_ids = partition.getTabletIds.toList - params.opaqued_query_plan = partition.getQueryPlan - - // max row number of one read batch - val batchSize = Try { - if(readOptions.getRequestBatchSize == null) DORIS_BATCH_SIZE_DEFAULT else readOptions.getRequestBatchSize; - } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_BATCH_SIZE, readOptions.getRequestBatchSize) - DORIS_BATCH_SIZE_DEFAULT - } - - val queryDorisTimeout = Try { - if(readOptions.getRequestQueryTimeoutS == null) DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT else readOptions.getRequestQueryTimeoutS - } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_REQUEST_QUERY_TIMEOUT_S, readOptions.getRequestQueryTimeoutS) - DORIS_REQUEST_QUERY_TIMEOUT_S_DEFAULT - } - - val execMemLimit = Try { - if(readOptions.getExecMemLimit == null) DORIS_EXEC_MEM_LIMIT_DEFAULT else readOptions.getExecMemLimit - } getOrElse { - logger.warn(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, DORIS_EXEC_MEM_LIMIT, readOptions.getExecMemLimit) - DORIS_EXEC_MEM_LIMIT_DEFAULT - } - - params.setBatchSize(batchSize) - params.setQueryTimeout(queryDorisTimeout) - params.setMemLimit(execMemLimit) - params.setUser(options.getUsername) - params.setPasswd(options.getPassword) - - logger.debug(s"Open scan params is, " + - s"cluster: ${params.getCluster}, " + - s"database: ${params.getDatabase}, " + - s"table: ${params.getTable}, " + - s"tabletId: ${params.getTabletIds}, " + - s"batch size: $batchSize, " + - s"query timeout: $queryDorisTimeout, " + - s"execution memory limit: $execMemLimit, " + - s"user: ${params.getUser}, " + - s"query plan: ${params.getOpaquedQueryPlan}") - - params - } - - protected val openResult: TScanOpenResult = client.openScanner(openParams) - protected val contextId: String = openResult.getContextId - protected val schema: Schema = - SchemaUtils.convertToSchema(openResult.getSelectedColumns) - - protected val asyncThread: Thread = new Thread { - override def run { - val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContextId(contextId) - while (!eos.get) { - nextBatchParams.setOffset(offset) - val nextResult = client.getNext(nextBatchParams) - eos.set(nextResult.isEos) - if (!eos.get) { - val rowBatch = new RowBatch(nextResult, schema).readArrow() - offset += rowBatch.getReadRowCount - rowBatch.close - rowBatchBlockingQueue.put(rowBatch) - } - } - } - } - - protected val asyncThreadStarted: Boolean = { - var started = false - if (deserializeArrowToRowBatchAsync) { - asyncThread.start - started = true - } - started - } - - logger.debug(s"Open scan result is, contextId: $contextId, schema: $schema.") - - /** - * read data and cached in rowBatch. - * @return true if hax next value - */ - def hasNext: Boolean = { - var hasNext = false - if (deserializeArrowToRowBatchAsync && asyncThreadStarted) { - // support deserialize Arrow to RowBatch asynchronously - if (rowBatch == null || !rowBatch.hasNext) { - val loop = new Breaks - loop.breakable { - while (!eos.get || !rowBatchBlockingQueue.isEmpty) { - if (!rowBatchBlockingQueue.isEmpty) { - rowBatch = rowBatchBlockingQueue.take - hasNext = true - loop.break - } else { - // wait for rowBatch put in queue or eos change - Thread.sleep(5) - } - } - } - } else { - hasNext = true - } - } else { - // Arrow data was acquired synchronously during the iterative process - if (!eos.get && (rowBatch == null || !rowBatch.hasNext)) { - if (rowBatch != null) { - offset += rowBatch.getReadRowCount - rowBatch.close - } - val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContextId(contextId) - nextBatchParams.setOffset(offset) - val nextResult = client.getNext(nextBatchParams) - eos.set(nextResult.isEos) - if (!eos.get) { - rowBatch = new RowBatch(nextResult, schema).readArrow() - } - } - hasNext = !eos.get - } - hasNext - } - - /** - * get next value. - * @return next value - */ - def next: java.util.List[_] = { - if (!hasNext) { - logger.error(SHOULD_NOT_HAPPEN_MESSAGE) - throw new ShouldNeverHappenException - } - rowBatch.next - } - - def close(): Unit = { - val closeParams = new TScanCloseParams - closeParams.setContextId(contextId) - client.closeScanner(closeParams) - } - -} diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java index 60524c8..dae75f0 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/DorisSourceSinkExample.java @@ -24,7 +24,6 @@ public class DorisSourceSinkExample { public static void main(String[] args) { EnvironmentSettings settings = EnvironmentSettings.newInstance() - .useBlinkPlanner() .inStreamingMode() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 261acbe..8e7368c 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -41,8 +41,6 @@ import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.thrift.TScanBatchResult; import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; -import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList; -import org.apache.flink.calcite.shaded.com.google.common.collect.Lists; import org.apache.flink.table.data.DecimalData; import org.junit.Assert; import org.junit.Rule; @@ -55,6 +53,7 @@ import java.io.ByteArrayOutputStream; import java.math.BigDecimal; import java.time.LocalDate; import java.time.LocalDateTime; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.NoSuchElementException; @@ -70,7 +69,7 @@ public class TestRowBatch { @Test public void testRowBatch() throws Exception { // schema - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + List<Field> childrenBuilder = new ArrayList<>(); childrenBuilder.add(new Field("k0", FieldType.nullable(new ArrowType.Bool()), null)); childrenBuilder.add(new Field("k1", FieldType.nullable(new ArrowType.Int(8, true)), null)); childrenBuilder.add(new Field("k2", FieldType.nullable(new ArrowType.Int(16, true)), null)); @@ -84,7 +83,7 @@ public class TestRowBatch { childrenBuilder.add(new Field("k6", FieldType.nullable(new ArrowType.Utf8()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -241,7 +240,7 @@ public class TestRowBatch { RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow(); - List<Object> expectedRow1 = Lists.newArrayList( + List<Object> expectedRow1 = Arrays.asList( Boolean.TRUE, (byte) 1, (short) 1, @@ -310,11 +309,11 @@ public class TestRowBatch { byte[] binaryRow1 = {'d', 'e', 'f'}; byte[] binaryRow2 = {'g', 'h', 'i'}; - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + List <Field> childrenBuilder = new ArrayList<>(); childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Binary()), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( @@ -378,11 +377,11 @@ public class TestRowBatch { @Test public void testDecimalV2() throws Exception { - ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder(); + List<Field> childrenBuilder = new ArrayList<>(); childrenBuilder.add(new Field("k7", FieldType.nullable(new ArrowType.Decimal(27, 9)), null)); VectorSchemaRoot root = VectorSchemaRoot.create( - new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null), + new org.apache.arrow.vector.types.pojo.Schema(childrenBuilder, null), new RootAllocator(Integer.MAX_VALUE)); ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org