This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7269dd4  [improvement](connector) improve log message (#274)
7269dd4 is described below

commit 7269dd493882b736c9af39afc0198e39278011bc
Author: gnehil <adamlee...@gmail.com>
AuthorDate: Fri Feb 28 14:41:24 2025 +0800

    [improvement](connector) improve log message (#274)
---
 .../doris/spark/client/DorisBackendHttpClient.java |  7 +-
 .../doris/spark/client/DorisFrontendClient.java    | 12 +--
 .../client/write/AbstractStreamLoadProcessor.java  | 40 +++++----
 .../client/entity/StreamLoadResponseTest.java      | 97 ++++++++++++++++++++++
 4 files changed, 133 insertions(+), 23 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
index 9c6cd81..54b5659 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java
@@ -20,6 +20,8 @@ package org.apache.doris.spark.client;
 import org.apache.doris.spark.client.entity.Backend;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -28,6 +30,7 @@ import java.util.function.BiFunction;
 
 public class DorisBackendHttpClient implements Serializable {
 
+    private static final Logger log = 
LoggerFactory.getLogger(DorisBackendHttpClient.class);
     private final List<Backend> backends;
 
     private transient CloseableHttpClient httpClient;
@@ -45,7 +48,7 @@ public class DorisBackendHttpClient implements Serializable {
             try {
                 return reqFunc.apply(backend, httpClient);
             } catch (Exception e) {
-                // todo
+                log.warn("Failed to execute request on backend: {}:{}", 
backend.getHost(), backend.getHttpPort(), e);
                 ex = e;
             }
         }
@@ -57,7 +60,7 @@ public class DorisBackendHttpClient implements Serializable {
             try {
                 httpClient.close();
             } catch (IOException e) {
-                // todo
+                log.warn("Failed to close http client", e);
             }
         }
     }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
index 88d9f0b..0a6669e 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java
@@ -123,7 +123,7 @@ public class DorisFrontendClient implements Serializable {
                         return parseFrontends(columnNames, rows);
                     });
                 } catch (Exception e) {
-                    LOG.warn("fetch fe request on " + frontendNode + " failed, 
err: " + e.getMessage());
+                    LOG.warn("fetch fe request on {} failed, err: {}", 
frontendNode, e.getMessage());
                     ex = e;
                 }
             }
@@ -135,8 +135,10 @@ public class DorisFrontendClient implements Serializable {
             }
             return frontendList;
         } else {
-            int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ? 
config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1;
-            int flightSqlPort = 
config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ? 
config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : -1;
+            int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ?
+                    config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1;
+            int flightSqlPort = 
config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ?
+                    config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : 
-1;
             return Arrays.stream(frontendNodeArray)
                     .map(node -> {
                         String[] nodeParts = node.split(":");
@@ -159,7 +161,7 @@ public class DorisFrontendClient implements Serializable {
             try {
                 return reqFunc.apply(frontEnd, httpClient);
             } catch (Exception e) {
-                LOG.warn("fe http request on " + frontEnd.hostHttpPortString() 
+ " failed, err: " + e.getMessage());
+                LOG.warn("fe http request on {} failed, err: {}", 
frontEnd.hostHttpPortString(), e.getMessage());
                 ex = e;
             }
         }
@@ -181,7 +183,7 @@ public class DorisFrontendClient implements Serializable {
             try (Connection conn = DriverManager.getConnection("jdbc:mysql://" 
+ frontEnd.getHost() + ":" + frontEnd.getQueryPort(), username, password)) {
                 return function.apply(conn);
             } catch (SQLException e) {
-                LOG.warn("fe jdbc query on " + frontEnd.hostQueryPortString() 
+ " failed, err: " + e.getMessage());
+                LOG.warn("fe jdbc query on {} failed, err: {}", 
frontEnd.hostQueryPortString(), e.getMessage());
                 ex = e;
             }
         }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
index 484653e..9222d81 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java
@@ -19,7 +19,6 @@ package org.apache.doris.spark.client.write;
 
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.json.JsonMapper;
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.doris.spark.client.DorisBackendHttpClient;
 import org.apache.doris.spark.client.DorisFrontendClient;
 import org.apache.doris.spark.client.entity.Backend;
@@ -78,8 +77,6 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
     private final String database;
     private final String table;
 
-    private static final String[] STREAM_LOAD_SUCCESS_STATUS = {"Success", 
"Publish Timeout"};
-
     private final boolean autoRedirect;
 
     private final boolean isHttpsEnabled;
@@ -110,12 +107,7 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
 
     private static final int arrowBufferSize = 1000;
 
-    private final static ExecutorService executor = 
Executors.newSingleThreadExecutor(runnable -> {
-        Thread thread = new Thread(runnable);
-        thread.setName("stream-load-worker-" + new 
AtomicInteger().getAndIncrement());
-        thread.setDaemon(true);
-        return thread;
-    });
+    private transient ExecutorService executor;
 
     private Future<CloseableHttpResponse> requestFuture = null;
 
@@ -136,9 +128,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
         this.isGzipCompressionEnabled = 
properties.containsKey("compress_type") && 
"gzip".equals(properties.get("compress_type"));
         if (properties.containsKey(GROUP_COMMIT)) {
             String message = "";
-            if (!isTwoPhaseCommitEnabled) message = "1";// todo
-            if (properties.containsKey(PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) message = "2";// todo
-            if 
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) 
message = "3";// todo
+            if (!isTwoPhaseCommitEnabled) message = "group commit does not 
support two-phase commit";
+            if (properties.containsKey(PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS)))
+                message = "group commit does not support partial column 
updates";
+            if 
(!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase()))
+                message = "Unsupported group commit mode: " + 
properties.get(GROUP_COMMIT);
             if (!message.isEmpty()) throw new 
IllegalArgumentException(message);
             groupCommit = properties.get(GROUP_COMMIT).toLowerCase();
         }
@@ -176,12 +170,11 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
         String resEntity = EntityUtils.toString(new 
BufferedHttpEntity(res.getEntity()));
         logger.info("stream load response: {}", resEntity);
         StreamLoadResponse response = MAPPER.readValue(resEntity, 
StreamLoadResponse.class);
-        if (ArrayUtils.contains(STREAM_LOAD_SUCCESS_STATUS, 
response.getStatus())) {
+        if (response != null && response.isSuccess()) {
             createNewBatch = true;
             return isTwoPhaseCommitEnabled ? 
String.valueOf(response.getTxnId()) : null;
         } else {
-            throw new StreamLoadException("stream load execute failed, status: 
" + response.getStatus()
-                    + ", msg: " + response.getMessage() + ", errUrl: " + 
response.getErrorURL());
+            throw new StreamLoadException("stream load execute failed, 
response: " + resEntity);
         }
     }
 
@@ -348,13 +341,16 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
             entity = new GzipCompressingEntity(entity);
         }
         httpPut.setEntity(entity);
-        return executor.submit(() -> client.execute(httpPut));
+        return getExecutors().submit(() -> client.execute(httpPut));
     }
 
     @Override
     public void close() throws IOException {
         createNewBatch = true;
         frontend.close();
+        if (executor != null && !executor.isShutdown()) {
+            executor.shutdown();
+        }
     }
 
     private List<Backend> getBackends() throws Exception {
@@ -372,4 +368,16 @@ public abstract class AbstractStreamLoadProcessor<R> 
implements DorisWriter<R>,
 
     protected abstract R copy(R row);
 
+    private synchronized ExecutorService getExecutors() {
+        if (executor == null) {
+            executor = Executors.newSingleThreadExecutor(runnable -> {
+                Thread thread = new Thread(runnable);
+                thread.setName("stream-load-worker-" + new 
AtomicInteger().getAndIncrement());
+                thread.setDaemon(true);
+                return thread;
+            });
+        }
+        return executor;
+    }
+
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java
new file mode 100644
index 0000000..22bba6c
--- /dev/null
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.client.entity;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+import org.junit.Test;
+import org.junit.jupiter.api.Assertions;
+
+public class StreamLoadResponseTest {
+
+    private final static ObjectMapper MAPPER = JsonMapper.builder().build();
+
+    @Test
+    public void testIsSuccess() throws JsonProcessingException {
+
+        String entity1 = "{" +
+                "\"TxnId\":1," +
+                "\"Label\":\"xxx\"," +
+                "\"Status\":\"Success\"," +
+                "\"ExistingJobStatus\":\"\"," +
+                "\"Message\":\"OK\"," +
+                "\"NumberTotalRows\":1," +
+                "\"NumberLoadedRows\":1," +
+                "\"NumberFilteredRows\":1," +
+                "\"NumberUnselectedRows\":1," +
+                "\"LoadBytes\":1," +
+                "\"LoadTimeMs\":1," +
+                "\"BeginTxnTimeMs\":1," +
+                "\"StreamLoadPutTimeMs\":1" +
+                "}";
+        StreamLoadResponse slr1 = MAPPER.readValue(entity1, 
StreamLoadResponse.class);
+        Assertions.assertTrue(slr1.isSuccess());
+
+        String entity2 = "{" +
+                "\"TxnId\":1," +
+                "\"Label\":\"label\"," +
+                "\"Status\":\"Publish Timeout\"," +
+                "\"ExistingJobStatus\":\"\"," +
+                "\"Message\":\"\"," +
+                "\"NumberTotalRows\":1," +
+                "\"NumberLoadedRows\":1," +
+                "\"NumberFilteredRows\":1," +
+                "\"NumberUnselectedRows\":1," +
+                "\"LoadBytes\":1," +
+                "\"LoadTimeMs\":1," +
+                "\"BeginTxnTimeMs\":1," +
+                "\"StreamLoadPutTimeMs\":1" +
+                "}";
+        StreamLoadResponse slr2 = MAPPER.readValue(entity2, 
StreamLoadResponse.class);
+        Assertions.assertTrue(slr2.isSuccess());
+
+        String entity3 = "{" +
+                "\"TxnId\":1," +
+                "\"Label\":\"xxx\"," +
+                "\"Status\":\"Label Already Exists\"," +
+                "\"ExistingJobStatus\":\"\"," +
+                "\"Message\":\"\"," +
+                "\"NumberTotalRows\":1," +
+                "\"NumberLoadedRows\":1," +
+                "\"NumberFilteredRows\":1," +
+                "\"NumberUnselectedRows\":1," +
+                "\"LoadBytes\":1," +
+                "\"LoadTimeMs\":1," +
+                "\"BeginTxnTimeMs\":1," +
+                "\"StreamLoadPutTimeMs\":1" +
+                "}";
+        StreamLoadResponse slr3 = MAPPER.readValue(entity3, 
StreamLoadResponse.class);
+        Assertions.assertFalse(slr3.isSuccess());
+
+        String entity4 = "{" +
+                "\"msg\":\"TStatus: errCode = 2, detailMessage = transaction 
[123] is already aborted, abort reason: User Abort\"," +
+                "\"status\":\"ANALYSIS_ERROR\"" +
+                "}";
+        StreamLoadResponse slr4 = MAPPER.readValue(entity4, 
StreamLoadResponse.class);
+        Assertions.assertFalse(slr4.isSuccess());
+
+    }
+
+
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to