This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push: new 7269dd4 [improvement](connector) improve log message (#274) 7269dd4 is described below commit 7269dd493882b736c9af39afc0198e39278011bc Author: gnehil <adamlee...@gmail.com> AuthorDate: Fri Feb 28 14:41:24 2025 +0800 [improvement](connector) improve log message (#274) --- .../doris/spark/client/DorisBackendHttpClient.java | 7 +- .../doris/spark/client/DorisFrontendClient.java | 12 +-- .../client/write/AbstractStreamLoadProcessor.java | 40 +++++---- .../client/entity/StreamLoadResponseTest.java | 97 ++++++++++++++++++++++ 4 files changed, 133 insertions(+), 23 deletions(-) diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java index 9c6cd81..54b5659 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisBackendHttpClient.java @@ -20,6 +20,8 @@ package org.apache.doris.spark.client; import org.apache.doris.spark.client.entity.Backend; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; @@ -28,6 +30,7 @@ import java.util.function.BiFunction; public class DorisBackendHttpClient implements Serializable { + private static final Logger log = LoggerFactory.getLogger(DorisBackendHttpClient.class); private final List<Backend> backends; private transient CloseableHttpClient httpClient; @@ -45,7 +48,7 @@ public class DorisBackendHttpClient implements Serializable { try { return reqFunc.apply(backend, httpClient); } catch (Exception e) { - // todo + log.warn("Failed to execute request on backend: {}:{}", backend.getHost(), backend.getHttpPort(), e); ex = e; } } @@ -57,7 +60,7 @@ public class DorisBackendHttpClient implements Serializable { try { httpClient.close(); } catch (IOException e) { - // todo + log.warn("Failed to close http client", e); } } } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java index 88d9f0b..0a6669e 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/DorisFrontendClient.java @@ -123,7 +123,7 @@ public class DorisFrontendClient implements Serializable { return parseFrontends(columnNames, rows); }); } catch (Exception e) { - LOG.warn("fetch fe request on " + frontendNode + " failed, err: " + e.getMessage()); + LOG.warn("fetch fe request on {} failed, err: {}", frontendNode, e.getMessage()); ex = e; } } @@ -135,8 +135,10 @@ public class DorisFrontendClient implements Serializable { } return frontendList; } else { - int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ? config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1; - int flightSqlPort = config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ? config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : -1; + int queryPort = config.contains(DorisOptions.DORIS_QUERY_PORT) ? + config.getValue(DorisOptions.DORIS_QUERY_PORT) : -1; + int flightSqlPort = config.contains(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) ? + config.getValue(DorisOptions.DORIS_READ_FLIGHT_SQL_PORT) : -1; return Arrays.stream(frontendNodeArray) .map(node -> { String[] nodeParts = node.split(":"); @@ -159,7 +161,7 @@ public class DorisFrontendClient implements Serializable { try { return reqFunc.apply(frontEnd, httpClient); } catch (Exception e) { - LOG.warn("fe http request on " + frontEnd.hostHttpPortString() + " failed, err: " + e.getMessage()); + LOG.warn("fe http request on {} failed, err: {}", frontEnd.hostHttpPortString(), e.getMessage()); ex = e; } } @@ -181,7 +183,7 @@ public class DorisFrontendClient implements Serializable { try (Connection conn = DriverManager.getConnection("jdbc:mysql://" + frontEnd.getHost() + ":" + frontEnd.getQueryPort(), username, password)) { return function.apply(conn); } catch (SQLException e) { - LOG.warn("fe jdbc query on " + frontEnd.hostQueryPortString() + " failed, err: " + e.getMessage()); + LOG.warn("fe jdbc query on {} failed, err: {}", frontEnd.hostQueryPortString(), e.getMessage()); ex = e; } } diff --git a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java index 484653e..9222d81 100644 --- a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java +++ b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/write/AbstractStreamLoadProcessor.java @@ -19,7 +19,6 @@ package org.apache.doris.spark.client.write; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.json.JsonMapper; -import org.apache.commons.lang3.ArrayUtils; import org.apache.doris.spark.client.DorisBackendHttpClient; import org.apache.doris.spark.client.DorisFrontendClient; import org.apache.doris.spark.client.entity.Backend; @@ -78,8 +77,6 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, private final String database; private final String table; - private static final String[] STREAM_LOAD_SUCCESS_STATUS = {"Success", "Publish Timeout"}; - private final boolean autoRedirect; private final boolean isHttpsEnabled; @@ -110,12 +107,7 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, private static final int arrowBufferSize = 1000; - private final static ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { - Thread thread = new Thread(runnable); - thread.setName("stream-load-worker-" + new AtomicInteger().getAndIncrement()); - thread.setDaemon(true); - return thread; - }); + private transient ExecutorService executor; private Future<CloseableHttpResponse> requestFuture = null; @@ -136,9 +128,11 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, this.isGzipCompressionEnabled = properties.containsKey("compress_type") && "gzip".equals(properties.get("compress_type")); if (properties.containsKey(GROUP_COMMIT)) { String message = ""; - if (!isTwoPhaseCommitEnabled) message = "1";// todo - if (properties.containsKey(PARTIAL_COLUMNS) && "true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) message = "2";// todo - if (!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) message = "3";// todo + if (!isTwoPhaseCommitEnabled) message = "group commit does not support two-phase commit"; + if (properties.containsKey(PARTIAL_COLUMNS) && "true".equalsIgnoreCase(properties.get(PARTIAL_COLUMNS))) + message = "group commit does not support partial column updates"; + if (!VALID_GROUP_MODE.contains(properties.get(GROUP_COMMIT).toLowerCase())) + message = "Unsupported group commit mode: " + properties.get(GROUP_COMMIT); if (!message.isEmpty()) throw new IllegalArgumentException(message); groupCommit = properties.get(GROUP_COMMIT).toLowerCase(); } @@ -176,12 +170,11 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, String resEntity = EntityUtils.toString(new BufferedHttpEntity(res.getEntity())); logger.info("stream load response: {}", resEntity); StreamLoadResponse response = MAPPER.readValue(resEntity, StreamLoadResponse.class); - if (ArrayUtils.contains(STREAM_LOAD_SUCCESS_STATUS, response.getStatus())) { + if (response != null && response.isSuccess()) { createNewBatch = true; return isTwoPhaseCommitEnabled ? String.valueOf(response.getTxnId()) : null; } else { - throw new StreamLoadException("stream load execute failed, status: " + response.getStatus() - + ", msg: " + response.getMessage() + ", errUrl: " + response.getErrorURL()); + throw new StreamLoadException("stream load execute failed, response: " + resEntity); } } @@ -348,13 +341,16 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, entity = new GzipCompressingEntity(entity); } httpPut.setEntity(entity); - return executor.submit(() -> client.execute(httpPut)); + return getExecutors().submit(() -> client.execute(httpPut)); } @Override public void close() throws IOException { createNewBatch = true; frontend.close(); + if (executor != null && !executor.isShutdown()) { + executor.shutdown(); + } } private List<Backend> getBackends() throws Exception { @@ -372,4 +368,16 @@ public abstract class AbstractStreamLoadProcessor<R> implements DorisWriter<R>, protected abstract R copy(R row); + private synchronized ExecutorService getExecutors() { + if (executor == null) { + executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable); + thread.setName("stream-load-worker-" + new AtomicInteger().getAndIncrement()); + thread.setDaemon(true); + return thread; + }); + } + return executor; + } + } diff --git a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java new file mode 100644 index 0000000..22bba6c --- /dev/null +++ b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/entity/StreamLoadResponseTest.java @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.spark.client.entity; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.json.JsonMapper; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; + +public class StreamLoadResponseTest { + + private final static ObjectMapper MAPPER = JsonMapper.builder().build(); + + @Test + public void testIsSuccess() throws JsonProcessingException { + + String entity1 = "{" + + "\"TxnId\":1," + + "\"Label\":\"xxx\"," + + "\"Status\":\"Success\"," + + "\"ExistingJobStatus\":\"\"," + + "\"Message\":\"OK\"," + + "\"NumberTotalRows\":1," + + "\"NumberLoadedRows\":1," + + "\"NumberFilteredRows\":1," + + "\"NumberUnselectedRows\":1," + + "\"LoadBytes\":1," + + "\"LoadTimeMs\":1," + + "\"BeginTxnTimeMs\":1," + + "\"StreamLoadPutTimeMs\":1" + + "}"; + StreamLoadResponse slr1 = MAPPER.readValue(entity1, StreamLoadResponse.class); + Assertions.assertTrue(slr1.isSuccess()); + + String entity2 = "{" + + "\"TxnId\":1," + + "\"Label\":\"label\"," + + "\"Status\":\"Publish Timeout\"," + + "\"ExistingJobStatus\":\"\"," + + "\"Message\":\"\"," + + "\"NumberTotalRows\":1," + + "\"NumberLoadedRows\":1," + + "\"NumberFilteredRows\":1," + + "\"NumberUnselectedRows\":1," + + "\"LoadBytes\":1," + + "\"LoadTimeMs\":1," + + "\"BeginTxnTimeMs\":1," + + "\"StreamLoadPutTimeMs\":1" + + "}"; + StreamLoadResponse slr2 = MAPPER.readValue(entity2, StreamLoadResponse.class); + Assertions.assertTrue(slr2.isSuccess()); + + String entity3 = "{" + + "\"TxnId\":1," + + "\"Label\":\"xxx\"," + + "\"Status\":\"Label Already Exists\"," + + "\"ExistingJobStatus\":\"\"," + + "\"Message\":\"\"," + + "\"NumberTotalRows\":1," + + "\"NumberLoadedRows\":1," + + "\"NumberFilteredRows\":1," + + "\"NumberUnselectedRows\":1," + + "\"LoadBytes\":1," + + "\"LoadTimeMs\":1," + + "\"BeginTxnTimeMs\":1," + + "\"StreamLoadPutTimeMs\":1" + + "}"; + StreamLoadResponse slr3 = MAPPER.readValue(entity3, StreamLoadResponse.class); + Assertions.assertFalse(slr3.isSuccess()); + + String entity4 = "{" + + "\"msg\":\"TStatus: errCode = 2, detailMessage = transaction [123] is already aborted, abort reason: User Abort\"," + + "\"status\":\"ANALYSIS_ERROR\"" + + "}"; + StreamLoadResponse slr4 = MAPPER.readValue(entity4, StreamLoadResponse.class); + Assertions.assertFalse(slr4.isSuccess()); + + } + + +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org