This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-doris-flink-connector.git

commit f6a2e258dcb31e12fd054b10b4bf4116c0004072
Author: Zhengguo Yang <yangz...@gmail.com>
AuthorDate: Fri Oct 15 13:03:04 2021 +0800

    [Dependency] Upgrade thirdparty libs (#6766)
    
    Upgrade the following dependecies:
    
    libevent -> 2.1.12
    OpenSSL 1.0.2k -> 1.1.1l
    thrift 0.9.3 -> 0.13.0
    protobuf 3.5.1 -> 3.14.0
    gflags 2.2.0 -> 2.2.2
    glog 0.3.3 -> 0.4.0
    googletest 1.8.0 -> 1.10.0
    snappy 1.1.7 -> 1.1.8
    gperftools 2.7 -> 2.9.1
    lz4 1.7.5 -> 1.9.3
    curl 7.54.1 -> 7.79.0
    re2 2017-05-01 -> 2021-02-02
    zstd 1.3.7 -> 1.5.0
    brotli 1.0.7 -> 1.0.9
    flatbuffers 1.10.0 -> 2.0.0
    apache-arrow 0.15.1 -> 5.0.0
    CRoaring 0.2.60 -> 0.3.4
    orc 1.5.8 -> 1.6.6
    libdivide 4.0.0 -> 5.0
    brpc 0.97 -> 1.0.0-rc02
    librdkafka 1.7.0 -> 1.8.0
    
    after this pr compile doris should use build-env:1.4.0
---
 pom.xml                                            | 11 +++++++--
 .../apache/doris/flink/backend/BackendClient.java  | 28 +++++++++++-----------
 .../doris/flink/datastream/ScalaValueReader.scala  | 20 ++++++++--------
 .../doris/flink/serialization/TestRowBatch.java    |  6 ++---
 4 files changed, 36 insertions(+), 29 deletions(-)

diff --git a/pom.xml b/pom.xml
index a0d10f4..ffe6784 100644
--- a/pom.xml
+++ b/pom.xml
@@ -11,8 +11,8 @@
     <properties>
         <scala.version>2.12</scala.version>
         <flink.version>1.11.2</flink.version>
-        <libthrift.version>0.9.3</libthrift.version>
-        <arrow.version>0.15.1</arrow.version>
+        <libthrift.version>0.13.0</libthrift.version>
+        <arrow.version>5.0.0</arrow.version>
         <maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
         <maven-javadoc-plugin.version>3.3.0</maven-javadoc-plugin.version>
         <maven-source-plugin.version>3.2.1</maven-source-plugin.version>
@@ -140,6 +140,12 @@
             <version>${arrow.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-netty</artifactId>
+            <version>${arrow.version}</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.25</version>
@@ -196,6 +202,7 @@
                 <version>0.1.11</version>
                 <configuration>
                     
<thriftExecutable>${doris.thirdparty}/installed/bin/thrift</thriftExecutable>
+                    <generator>java:fullcamel</generator>
                 </configuration>
                 <executions>
                     <execution>
diff --git a/src/main/java/org/apache/doris/flink/backend/BackendClient.java 
b/src/main/java/org/apache/doris/flink/backend/BackendClient.java
index 40bb5c9..9b8d955 100644
--- a/src/main/java/org/apache/doris/flink/backend/BackendClient.java
+++ b/src/main/java/org/apache/doris/flink/backend/BackendClient.java
@@ -126,14 +126,14 @@ public class BackendClient {
         for (int attempt = 0; attempt < retries; ++attempt) {
             logger.debug("Attempt {} to openScanner {}.", attempt, routing);
             try {
-                TScanOpenResult result = client.open_scanner(openParams);
+                TScanOpenResult result = client.openScanner(openParams);
                 if (result == null) {
                     logger.warn("Open scanner result from {} is null.", 
routing);
                     continue;
                 }
-                if 
(!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
+                if 
(!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
                     logger.warn("The status of open scanner result from {} is 
'{}', error message is: {}.",
-                            routing, result.getStatus().getStatus_code(), 
result.getStatus().getError_msgs());
+                            routing, result.getStatus().getStatusCode(), 
result.getStatus().getErrorMsgs());
                     continue;
                 }
                 return result;
@@ -163,14 +163,14 @@ public class BackendClient {
         for (int attempt = 0; attempt < retries; ++attempt) {
             logger.debug("Attempt {} to getNext {}.", attempt, routing);
             try {
-                result = client.get_next(nextBatchParams);
+                result = client.getNext(nextBatchParams);
                 if (result == null) {
                     logger.warn("GetNext result from {} is null.", routing);
                     continue;
                 }
-                if 
(!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
+                if 
(!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
                     logger.warn("The status of get next result from {} is 
'{}', error message is: {}.",
-                            routing, result.getStatus().getStatus_code(), 
result.getStatus().getError_msgs());
+                            routing, result.getStatus().getStatusCode(), 
result.getStatus().getErrorMsgs());
                     continue;
                 }
                 return result;
@@ -179,11 +179,11 @@ public class BackendClient {
                 ex = e;
             }
         }
-        if (result != null && (TStatusCode.OK != 
(result.getStatus().getStatus_code()))) {
-            logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, 
result.getStatus().getStatus_code(),
-                    result.getStatus().getError_msgs());
-            throw new DorisInternalException(routing.toString(), 
result.getStatus().getStatus_code(),
-                    result.getStatus().getError_msgs());
+        if (result != null && (TStatusCode.OK != 
(result.getStatus().getStatusCode()))) {
+            logger.error(ErrorMessages.DORIS_INTERNAL_FAIL_MESSAGE, routing, 
result.getStatus().getStatusCode(),
+                    result.getStatus().getErrorMsgs());
+            throw new DorisInternalException(routing.toString(), 
result.getStatus().getStatusCode(),
+                    result.getStatus().getErrorMsgs());
         }
         logger.error(ErrorMessages.CONNECT_FAILED_MESSAGE, routing);
         throw new ConnectedFailedException(routing.toString(), ex);
@@ -199,14 +199,14 @@ public class BackendClient {
         for (int attempt = 0; attempt < retries; ++attempt) {
             logger.debug("Attempt {} to closeScanner {}.", attempt, routing);
             try {
-                TScanCloseResult result = client.close_scanner(closeParams);
+                TScanCloseResult result = client.closeScanner(closeParams);
                 if (result == null) {
                     logger.warn("CloseScanner result from {} is null.", 
routing);
                     continue;
                 }
-                if 
(!TStatusCode.OK.equals(result.getStatus().getStatus_code())) {
+                if 
(!TStatusCode.OK.equals(result.getStatus().getStatusCode())) {
                     logger.warn("The status of get next result from {} is 
'{}', error message is: {}.",
-                            routing, result.getStatus().getStatus_code(), 
result.getStatus().getError_msgs());
+                            routing, result.getStatus().getStatusCode(), 
result.getStatus().getErrorMsgs());
                     continue;
                 }
                 break;
diff --git 
a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala 
b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
index e69a86f..093390d 100644
--- a/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
+++ b/src/main/scala/org/apache/doris/flink/datastream/ScalaValueReader.scala
@@ -102,9 +102,9 @@ class ScalaValueReader(partition: PartitionDefinition, 
options: DorisOptions, re
       DORIS_EXEC_MEM_LIMIT_DEFAULT
     }
 
-    params.setBatch_size(batchSize)
-    params.setQuery_timeout(queryDorisTimeout)
-    params.setMem_limit(execMemLimit)
+    params.setBatchSize(batchSize)
+    params.setQueryTimeout(queryDorisTimeout)
+    params.setMemLimit(execMemLimit)
     params.setUser(options.getUsername)
     params.setPasswd(options.getPassword)
 
@@ -112,25 +112,25 @@ class ScalaValueReader(partition: PartitionDefinition, 
options: DorisOptions, re
         s"cluster: ${params.getCluster}, " +
         s"database: ${params.getDatabase}, " +
         s"table: ${params.getTable}, " +
-        s"tabletId: ${params.getTablet_ids}, " +
+        s"tabletId: ${params.getTabletIds}, " +
         s"batch size: $batchSize, " +
         s"query timeout: $queryDorisTimeout, " +
         s"execution memory limit: $execMemLimit, " +
         s"user: ${params.getUser}, " +
-        s"query plan: ${params.opaqued_query_plan}")
+        s"query plan: ${params.getOpaquedQueryPlan}")
 
     params
   }
 
   protected val openResult: TScanOpenResult = client.openScanner(openParams)
-  protected val contextId: String = openResult.getContext_id
+  protected val contextId: String = openResult.getContextId
   protected val schema: Schema =
-    SchemaUtils.convertToSchema(openResult.getSelected_columns)
+    SchemaUtils.convertToSchema(openResult.getSelectedColumns)
 
   protected val asyncThread: Thread = new Thread {
     override def run {
       val nextBatchParams = new TScanNextBatchParams
-      nextBatchParams.setContext_id(contextId)
+      nextBatchParams.setContextId(contextId)
       while (!eos.get) {
         nextBatchParams.setOffset(offset)
         val nextResult = client.getNext(nextBatchParams)
@@ -189,7 +189,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
options: DorisOptions, re
           rowBatch.close
         }
         val nextBatchParams = new TScanNextBatchParams
-        nextBatchParams.setContext_id(contextId)
+        nextBatchParams.setContextId(contextId)
         nextBatchParams.setOffset(offset)
         val nextResult = client.getNext(nextBatchParams)
         eos.set(nextResult.isEos)
@@ -216,7 +216,7 @@ class ScalaValueReader(partition: PartitionDefinition, 
options: DorisOptions, re
 
   def close(): Unit = {
     val closeParams = new TScanCloseParams
-    closeParams.context_id = contextId
+    closeParams.setContextId(contextId)
     client.closeScanner(closeParams)
   }
 
diff --git 
a/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java 
b/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index a9d2492..ac19066 100644
--- a/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ b/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -220,7 +220,7 @@ public class TestRowBatch {
         arrowStreamWriter.close();
 
         TStatus status = new TStatus();
-        status.setStatus_code(TStatusCode.OK);
+        status.setStatusCode(TStatusCode.OK);
         TScanBatchResult scanBatchResult = new TScanBatchResult();
         scanBatchResult.setStatus(status);
         scanBatchResult.setEos(false);
@@ -342,7 +342,7 @@ public class TestRowBatch {
         arrowStreamWriter.close();
 
         TStatus status = new TStatus();
-        status.setStatus_code(TStatusCode.OK);
+        status.setStatusCode(TStatusCode.OK);
         TScanBatchResult scanBatchResult = new TScanBatchResult();
         scanBatchResult.setStatus(status);
         scanBatchResult.setEos(false);
@@ -404,7 +404,7 @@ public class TestRowBatch {
         arrowStreamWriter.close();
 
         TStatus status = new TStatus();
-        status.setStatus_code(TStatusCode.OK);
+        status.setStatusCode(TStatusCode.OK);
         TScanBatchResult scanBatchResult = new TScanBatchResult();
         scanBatchResult.setStatus(status);
         scanBatchResult.setEos(false);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to