This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git
commit f6a2e258dcb31e12fd054b10b4bf4116c0004072 Author: Zhengguo Yang <yangz...@gmail.com> AuthorDate: Fri Oct 15 13:03:04 2021 +0800 [Dependency] Upgrade thirdparty libs (#6766) Upgrade the following dependecies: libevent -> 2.1.12 OpenSSL 1.0.2k -> 1.1.1l thrift 0.9.3 -> 0.13.0 protobuf 3.5.1 -> 3.14.0 gflags 2.2.0 -> 2.2.2 glog 0.3.3 -> 0.4.0 googletest 1.8.0 -> 1.10.0 snappy 1.1.7 -> 1.1.8 gperftools 2.7 -> 2.9.1 lz4 1.7.5 -> 1.9.3 curl 7.54.1 -> 7.79.0 re2 2017-05-01 -> 2021-02-02 zstd 1.3.7 -> 1.5.0 brotli 1.0.7 -> 1.0.9 flatbuffers 1.10.0 -> 2.0.0 apache-arrow 0.15.1 -> 5.0.0 CRoaring 0.2.60 -> 0.3.4 orc 1.5.8 -> 1.6.6 libdivide 4.0.0 -> 5.0 brpc 0.97 -> 1.0.0-rc02 librdkafka 1.7.0 -> 1.8.0 after this pr compile doris should use build-env:1.4.0 --- pom.xml | 11 +++++++-- .../apache/doris/flink/backend/BackendClient.java | 28 +++++++++++----------- .../doris/flink/datastream/ScalaValueReader.scala | 20 ++++++++-------- .../doris/flink/serialization/TestRowBatch.java | 6 ++--- 4 files changed, 36 insertions(+), 29 deletions(-) diff --git a/pom.xml b/pom.xml index a0d10f4..ffe6784 100644 --- a/pom.xml +++ b/pom.xml @@ -11,8 +11,8 @@ <properties> <scala.version>2.12</scala.version> <flink.version>1.11.2</flink.version> - <libthrift.version>0.9.3</libthrift.version> - <arrow.version>0.15.1</arrow.version> + <libthrift.version>0.13.0</libthrift.version> + <arrow.version>5.0.0</arrow.version> <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version> <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version> <maven-source-plugin.version>3.2.1</maven-source-plugin.version> @@ -140,6 +140,12 @@ <version>${arrow.version}</version> </dependency> <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-netty</artifactId> + <version>${arrow.version}</version> + <scope>runtime</scope> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> @@ -196,6 +202,7 @@ <version>0.1.11</version> <configuration> <thriftExecutable>${doris.thirdparty}/installed/bin/thrift</thriftExecutable> + <generator>java:fullcamel</generator> </configuration> <executions> <execution> diff --git a/src/main/java/org/apache/doris/flink/backend/BackendClient.java b/src/main/java/org/apache/doris/flink/backend/BackendClient.java index 40bb5c9..9b8d955 100644 --- a/src/main/java/org/apache/doris/flink/backend/BackendClient.java +++ b/src/main/java/org/apache/doris/flink/backend/BackendClient.java @@ -126,14 +126,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to openScanner {}.", attempt, routing); try { - TScanOpenResult result = client.open_scanner(openParams); + TScanOpenResult result = client.openScanner(openParams); if (result == null) { logger.warn("Open scanner result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of open scanner result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } return result; @@ -163,14 +163,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to getNext {}.", attempt, routing); try { - result = client.get_next(nextBatchParams); + result = client.getNext(nextBatchParams); if (result == null) { logger.warn("GetNext result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of get next result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } return result; @@ -179,11 +179,11 @@ public class BackendClient { ex = e; } } - if (result != null && (TStatusCode.OK != (result.getStatus().getStatus_code()))) { - logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatus_code(), - result.getStatus().getError_msgs()); - throw new DorisInternalException(routing.toString(), result.getStatus().getStatus_code(), - result.getStatus().getError_msgs()); + if (result != null && (TStatusCode.OK != (result.getStatus().getStatusCode()))) { + logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); + throw new DorisInternalException(routing.toString(), result.getStatus().getStatusCode(), + result.getStatus().getErrorMsgs()); } logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing); throw new ConnectedFailedException(routing.toString(), ex); @@ -199,14 +199,14 @@ public class BackendClient { for (int attempt = 0; attempt < retries; ++attempt) { logger.debug("Attempt {} to closeScanner {}.", attempt, routing); try { - TScanCloseResult result = client.close_scanner(closeParams); + TScanCloseResult result = client.closeScanner(closeParams); if (result == null) { logger.warn("CloseScanner result from {} is null.", routing); continue; } - if (!TStatusCode.OK.equals(result.getStatus().getStatus_code())) { + if (!TStatusCode.OK.equals(result.getStatus().getStatusCode())) { logger.warn("The status of get next result from {} is '{}', error message is: {}.", - routing, result.getStatus().getStatus_code(), result.getStatus().getError_msgs()); + routing, result.getStatus().getStatusCode(), result.getStatus().getErrorMsgs()); continue; } break; diff --git a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala index e69a86f..093390d 100644 --- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala +++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala @@ -102,9 +102,9 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re DORIS_EXEC_MEM_LIMIT_DEFAULT } - params.setBatch_size(batchSize) - params.setQuery_timeout(queryDorisTimeout) - params.setMem_limit(execMemLimit) + params.setBatchSize(batchSize) + params.setQueryTimeout(queryDorisTimeout) + params.setMemLimit(execMemLimit) params.setUser(options.getUsername) params.setPasswd(options.getPassword) @@ -112,25 +112,25 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re s"cluster: ${params.getCluster}, " + s"database: ${params.getDatabase}, " + s"table: ${params.getTable}, " + - s"tabletId: ${params.getTablet_ids}, " + + s"tabletId: ${params.getTabletIds}, " + s"batch size: $batchSize, " + s"query timeout: $queryDorisTimeout, " + s"execution memory limit: $execMemLimit, " + s"user: ${params.getUser}, " + - s"query plan: ${params.opaqued_query_plan}") + s"query plan: ${params.getOpaquedQueryPlan}") params } protected val openResult: TScanOpenResult = client.openScanner(openParams) - protected val contextId: String = openResult.getContext_id + protected val contextId: String = openResult.getContextId protected val schema: Schema = - SchemaUtils.convertToSchema(openResult.getSelected_columns) + SchemaUtils.convertToSchema(openResult.getSelectedColumns) protected val asyncThread: Thread = new Thread { override def run { val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContext_id(contextId) + nextBatchParams.setContextId(contextId) while (!eos.get) { nextBatchParams.setOffset(offset) val nextResult = client.getNext(nextBatchParams) @@ -189,7 +189,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re rowBatch.close } val nextBatchParams = new TScanNextBatchParams - nextBatchParams.setContext_id(contextId) + nextBatchParams.setContextId(contextId) nextBatchParams.setOffset(offset) val nextResult = client.getNext(nextBatchParams) eos.set(nextResult.isEos) @@ -216,7 +216,7 @@ class ScalaValueReader(partition: PartitionDefinition, options: DorisOptions, re def close(): Unit = { val closeParams = new TScanCloseParams - closeParams.context_id = contextId + closeParams.setContextId(contextId) client.closeScanner(closeParams) } diff --git a/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index a9d2492..ac19066 100644 --- a/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -220,7 +220,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); @@ -342,7 +342,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); @@ -404,7 +404,7 @@ public class TestRowBatch { arrowStreamWriter.close(); TStatus status = new TStatus(); - status.setStatus_code(TStatusCode.OK); + status.setStatusCode(TStatusCode.OK); TScanBatchResult scanBatchResult = new TScanBatchResult(); scanBatchResult.setStatus(status); scanBatchResult.setEos(false); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org