This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 0a7acdce95 [Improve][Connector-V2] Doris stream load use FE instead of
BE (#6235)
0a7acdce95 is described below
commit 0a7acdce95acd795a43c99e799cbced3dacbdf73
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jan 19 11:07:19 2024 +0800
[Improve][Connector-V2] Doris stream load use FE instead of BE (#6235)
---
.../connectors/doris/rest/RestService.java | 177 +--------------------
.../connectors/doris/rest/models/BackendV2.java | 78 ---------
.../doris/sink/committer/DorisCommitter.java | 7 +-
.../doris/sink/writer/DorisSinkWriter.java | 46 +-----
.../doris/sink/writer/DorisStreamLoad.java | 12 +-
.../doris/source/serialization/Routing.java | 6 +-
.../connectors/doris/util/ErrorMessages.java | 6 +-
.../e2e/connector/doris/AbstractDorisIT.java | 2 -
.../seatunnel/e2e/connector/doris/DorisIT.java | 9 ++
.../resources/doris_source_and_sink_2pc_false.conf | 50 ++++++
10 files changed, 81 insertions(+), 312 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
index 8729862516..315f36cfa2 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java
@@ -22,9 +22,7 @@ import
org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
import org.apache.seatunnel.connectors.doris.rest.models.QueryPlan;
-import org.apache.seatunnel.connectors.doris.rest.models.Schema;
import org.apache.seatunnel.connectors.doris.rest.models.Tablet;
import org.apache.seatunnel.connectors.doris.util.ErrorMessages;
@@ -63,21 +61,13 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
@Slf4j
public class RestService implements Serializable {
public static final int REST_RESPONSE_STATUS_OK = 200;
- public static final int REST_RESPONSE_CODE_OK = 0;
- private static final String REST_RESPONSE_BE_ROWS_KEY = "rows";
private static final String API_PREFIX = "/api";
- private static final String SCHEMA = "_schema";
private static final String QUERY_PLAN = "_query_plan";
- private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
- @Deprecated private static final String BACKENDS =
"/rest/v1/system?path=//backends";
- private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
- private static final String FE_LOGIN = "/rest/v1/login";
- private static final String BASE_URL = "http://%s%s";
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static String send(DorisConfig dorisConfig, HttpRequestBase
request, Logger logger)
throws DorisConnectorException {
@@ -135,11 +125,10 @@ public class RestService implements Serializable {
request.getURI(),
response);
// Handle the problem of inconsistent data format returned by
http v1 and v2
- ObjectMapper mapper = new ObjectMapper();
- Map map = mapper.readValue(response, Map.class);
+ Map map = OBJECT_MAPPER.readValue(response, Map.class);
if (map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
- return mapper.writeValueAsString(data);
+ return OBJECT_MAPPER.writeValueAsString(data);
} else {
return response;
}
@@ -170,7 +159,7 @@ public class RestService implements Serializable {
.getBytes(StandardCharsets.UTF_8));
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
InputStream content = ((HttpPost) request).getEntity().getContent();
- String res = IOUtils.toString(content);
+ String res = IOUtils.toString(content, StandardCharsets.UTF_8);
conn.setDoOutput(true);
conn.setDoInput(true);
PrintWriter out = new PrintWriter(conn.getOutputStream());
@@ -260,103 +249,6 @@ public class RestService implements Serializable {
return nodes.get(0).trim();
}
- @VisibleForTesting
- static List<String> allEndpoints(String feNodes, Logger logger) throws
DorisConnectorException {
- logger.trace("Parse fenodes '{}'.", feNodes);
- if (StringUtils.isEmpty(feNodes)) {
- String errMsg =
- String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
"fenodes", feNodes);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- List<String> nodes =
-
Arrays.stream(feNodes.split(",")).map(String::trim).collect(Collectors.toList());
- Collections.shuffle(nodes);
- return nodes;
- }
-
- @VisibleForTesting
- public static String randomBackend(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException {
- List<BackendV2.BackendRowV2> backends = getBackendsV2(dorisConfig,
logger);
- logger.trace("Parse beNodes '{}'.", backends);
- if (backends == null || backends.isEmpty()) {
- logger.error(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "beNodes",
backends);
- String errMsg =
- String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE,
"beNodes", backends);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- Collections.shuffle(backends);
- BackendV2.BackendRowV2 backend = backends.get(0);
- return backend.getIp() + ":" + backend.getHttpPort();
- }
-
- public static String getBackend(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException {
- try {
- return randomBackend(dorisConfig, logger);
- } catch (Exception e) {
- String errMsg = "Failed to get backend via " +
dorisConfig.getFrontends();
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
- }
-
- @VisibleForTesting
- public static List<BackendV2.BackendRowV2> getBackendsV2(DorisConfig
dorisConfig, Logger logger)
- throws DorisConnectorException {
- String feNodes = dorisConfig.getFrontends();
- List<String> feNodeList = allEndpoints(feNodes, logger);
- for (String feNode : feNodeList) {
- try {
- String beUrl = "http://" + feNode + BACKENDS_V2;
- HttpGet httpGet = new HttpGet(beUrl);
- String response = send(dorisConfig, httpGet, logger);
- logger.info("Backend Info:{}", response);
- return parseBackendV2(response, logger);
- } catch (DorisConnectorException e) {
- logger.info(
- "Doris FE node {} is unavailable: {}, Request the next
Doris FE node",
- feNode,
- e.getMessage());
- }
- }
- String errMsg = "No Doris FE is available, please check configuration";
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
-
- static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger
logger)
- throws DorisConnectorException {
- ObjectMapper mapper = new ObjectMapper();
- BackendV2 backend;
- try {
- backend = mapper.readValue(response, BackendV2.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris BE's response is not a json. res: " +
response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris BE's response cannot map to schema. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris BE's response to json failed. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
-
- if (backend == null) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED,
- ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- }
- List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
- logger.debug("Parsing schema result is '{}'.", backendRows);
- return backendRows;
- }
-
@VisibleForTesting
static String getUriStr(DorisConfig dorisConfig, Logger logger) throws
DorisConnectorException {
String tableIdentifier = dorisConfig.getDatabase() + "." +
dorisConfig.getTable();
@@ -371,64 +263,6 @@ public class RestService implements Serializable {
+ "/";
}
- public static Schema getSchema(DorisConfig dorisConfig, Logger logger)
- throws DorisConnectorException {
- logger.trace("Finding schema.");
- HttpGet httpGet = new HttpGet(getUriStr(dorisConfig, logger) + SCHEMA);
- String response = send(dorisConfig, httpGet, logger);
- logger.debug("Find schema response is '{}'.", response);
- return parseSchema(response, logger);
- }
-
- public static boolean isUniqueKeyType(DorisConfig dorisConfig, Logger
logger)
- throws DorisConnectorException {
- try {
- return UNIQUE_KEYS_TYPE.equals(getSchema(dorisConfig,
logger).getKeysType());
- } catch (Exception e) {
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
- }
- }
-
- @VisibleForTesting
- public static Schema parseSchema(String response, Logger logger)
- throws DorisConnectorException {
- logger.trace("Parse response '{}' to schema.", response);
- ObjectMapper mapper = new ObjectMapper();
- Schema schema;
- try {
- schema = mapper.readValue(response, Schema.class);
- } catch (JsonParseException e) {
- String errMsg = "Doris FE's response is not a json. res: " +
response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (JsonMappingException e) {
- String errMsg = "Doris FE's response cannot map to schema. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- } catch (IOException e) {
- String errMsg = "Parse Doris FE's response to json failed. res: "
+ response;
- logger.error(errMsg, e);
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg, e);
- }
-
- if (schema == null) {
- throw new DorisConnectorException(
- DorisConnectorErrorCode.REST_SERVICE_FAILED,
- ErrorMessages.SHOULD_NOT_HAPPEN_MESSAGE);
- }
-
- if (schema.getStatus() != REST_RESPONSE_STATUS_OK) {
- String errMsg = "Doris FE's response is not OK, status is " +
schema.getStatus();
- logger.error(errMsg);
- throw new
DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
- }
- logger.debug("Parsing schema result is '{}'.", schema);
- return schema;
- }
-
public static List<PartitionDefinition> findPartitions(
SeaTunnelRowType rowType, DorisConfig dorisConfig, Logger logger)
throws DorisConnectorException {
@@ -438,9 +272,6 @@ public class RestService implements Serializable {
if (rowType.getFieldNames().length != 0) {
readFields = String.join(",", rowType.getFieldNames());
}
- // String readFields =
- // StringUtils.isBlank(dorisConfig.getReadField()) ?
"*" :
- // dorisConfig.getReadField();
String sql =
"select "
+ readFields
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
deleted file mode 100644
index 47759e4bb0..0000000000
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/models/BackendV2.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.doris.rest.models;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.List;
-
-/** Be response model */
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class BackendV2 {
-
- @JsonProperty(value = "backends")
- private List<BackendRowV2> backends;
-
- public List<BackendRowV2> getBackends() {
- return backends;
- }
-
- public void setBackends(List<BackendRowV2> backends) {
- this.backends = backends;
- }
-
- public static class BackendRowV2 {
- @JsonProperty("ip")
- public String ip;
-
- @JsonProperty("http_port")
- public int httpPort;
-
- @JsonProperty("is_alive")
- public boolean isAlive;
-
- public String getIp() {
- return ip;
- }
-
- public void setIp(String ip) {
- this.ip = ip;
- }
-
- public int getHttpPort() {
- return httpPort;
- }
-
- public void setHttpPort(int httpPort) {
- this.httpPort = httpPort;
- }
-
- public boolean isAlive() {
- return isAlive;
- }
-
- public void setAlive(boolean alive) {
- isAlive = alive;
- }
-
- public String toBackendString() {
- return ip + ":" + httpPort;
- }
- }
-}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
index 92d18520ab..63d89f9e57 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/committer/DorisCommitter.java
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
import org.apache.seatunnel.connectors.doris.sink.HttpPutBuilder;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
@@ -94,14 +93,14 @@ public class DorisCommitter implements
SinkCommitter<DorisCommitInfo> {
response = httpClient.execute(putBuilder.build());
} catch (IOException e) {
log.error("commit transaction failed: ", e);
- hostPort = RestService.getBackend(dorisConfig, log);
+ hostPort = dorisConfig.getFrontends();
continue;
}
statusCode = response.getStatusLine().getStatusCode();
reasonPhrase = response.getStatusLine().getReasonPhrase();
if (statusCode != HTTP_TEMPORARY_REDIRECT) {
log.warn("commit failed with {}, reason {}", hostPort,
reasonPhrase);
- hostPort = RestService.getBackend(dorisConfig, log);
+ hostPort = dorisConfig.getFrontends();
} else {
break;
}
@@ -113,7 +112,7 @@ public class DorisCommitter implements
SinkCommitter<DorisCommitInfo> {
}
ObjectMapper mapper = new ObjectMapper();
- if (response != null && response.getEntity() != null) {
+ if (response.getEntity() != null) {
String loadResult = EntityUtils.toString(response.getEntity());
Map<String, String> res =
mapper.readValue(loadResult, new
TypeReference<HashMap<String, String>>() {});
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 40e0bc3a2f..8c945e0fed 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -25,8 +25,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.doris.config.DorisConfig;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
-import org.apache.seatunnel.connectors.doris.rest.RestService;
-import org.apache.seatunnel.connectors.doris.rest.models.BackendV2;
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -39,8 +37,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -58,7 +54,6 @@ public class DorisSinkWriter
implements SinkWriter<SeaTunnelRow, DorisCommitInfo, DorisSinkState>,
SupportMultiTableSinkWriter<Void> {
private static final int INITIAL_DELAY = 200;
- private static final int CONNECT_TIMEOUT = 1000;
private static final List<String> DORIS_SUCCESS_STATUS =
new ArrayList<>(Arrays.asList(LoadStatus.SUCCESS,
LoadStatus.PUBLISH_TIMEOUT));
private long lastCheckpointId;
@@ -73,15 +68,13 @@ public class DorisSinkWriter
private final transient ScheduledExecutorService scheduledExecutorService;
private transient Thread executorThread;
private transient volatile Exception loadException = null;
- private List<BackendV2.BackendRowV2> backends;
public DorisSinkWriter(
SinkWriter.Context context,
List<DorisSinkState> state,
CatalogTable catalogTable,
DorisConfig dorisConfig,
- String jobId)
- throws IOException {
+ String jobId) {
this.dorisConfig = dorisConfig;
this.catalogTable = catalogTable;
this.lastCheckpointId = !state.isEmpty() ?
state.get(0).getCheckpointId() : 0;
@@ -105,9 +98,8 @@ public class DorisSinkWriter
this.initializeLoad();
}
- private void initializeLoad() throws IOException {
- this.backends = RestService.getBackendsV2(dorisConfig, log);
- String backend = getAvailableBackend();
+ private void initializeLoad() {
+ String backend = dorisConfig.getFrontends();
try {
this.dorisStreamLoad =
new DorisStreamLoad(
@@ -178,15 +170,14 @@ public class DorisSinkWriter
}
@Override
- public List<DorisSinkState> snapshotState(long checkpointId) throws
IOException {
+ public List<DorisSinkState> snapshotState(long checkpointId) {
checkState(dorisStreamLoad != null);
startLoad(labelGenerator.generateLabel(checkpointId + 1));
this.lastCheckpointId = checkpointId;
return Collections.singletonList(new DorisSinkState(labelPrefix,
lastCheckpointId));
}
- private void startLoad(String label) throws IOException {
- this.dorisStreamLoad.setHostPort(getAvailableBackend());
+ private void startLoad(String label) {
this.dorisStreamLoad.startLoad(label);
this.loading = true;
}
@@ -250,33 +241,6 @@ public class DorisSinkWriter
}
}
- private String getAvailableBackend() {
- Collections.shuffle(backends);
- for (BackendV2.BackendRowV2 backend : backends) {
- String res = backend.toBackendString();
- if (tryHttpConnection(res)) {
- return res;
- }
- }
- String errMsg = "no available backend.";
- throw new
DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, errMsg);
- }
-
- public boolean tryHttpConnection(String backend) {
- try {
- backend = "http://" + backend;
- URL url = new URL(backend);
- HttpURLConnection co = (HttpURLConnection) url.openConnection();
- co.setConnectTimeout(CONNECT_TIMEOUT);
- co.connect();
- co.disconnect();
- return true;
- } catch (Exception ex) {
- log.warn("Failed to connect to backend:{}", backend, ex);
- return false;
- }
- }
-
private DorisSerializer createSerializer(
DorisConfig dorisConfig, SeaTunnelRowType seaTunnelRowType) {
return new SeaTunnelRowSerializer(
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index c2a8fc3bb5..bf2136091d 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -63,9 +63,8 @@ public class DorisStreamLoad implements Serializable {
private static final String LOAD_URL_PATTERN =
"http://%s/api/%s/%s/_stream_load";
private static final String ABORT_URL_PATTERN =
"http://%s/api/%s/_stream_load_2pc";
private static final String JOB_EXIST_FINISHED = "FINISHED";
-
- private String loadUrlStr;
- private String hostPort;
+ private final String loadUrlStr;
+ private final String hostPort;
private final String abortUrlStr;
private final String user;
private final String passwd;
@@ -123,11 +122,6 @@ public class DorisStreamLoad implements Serializable {
return hostPort;
}
- public void setHostPort(String hostPort) {
- this.hostPort = hostPort;
- this.loadUrlStr = String.format(LOAD_URL_PATTERN, hostPort, this.db,
this.table);
- }
-
public Future<CloseableHttpResponse> getPendingLoadFuture() {
return pendingLoadFuture;
}
@@ -232,7 +226,7 @@ public class DorisStreamLoad implements Serializable {
}
}
- public void startLoad(String label) throws IOException {
+ public void startLoad(String label) {
loadBatchFirstRecord = true;
recordCount = 0;
this.label = label;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
index e48a17da7c..95917a5ec7 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/source/serialization/Routing.java
@@ -48,7 +48,11 @@ public class Routing {
try {
this.port = Integer.parseInt(hostPort[1]);
} catch (NumberFormatException e) {
- logger.error(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE, "Doris
BE's port", hostPort[1]);
+ logger.error(
+ String.format(
+ ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
+ "Doris BE's port",
+ hostPort[1]));
String errMsg =
String.format(ErrorMessages.PARSE_NUMBER_FAILED_MESSAGE,
"routing", routing);
throw new
DorisConnectorException(DorisConnectorErrorCode.ROUTING_FAILED, errMsg, e);
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
index 8de43d3031..52e1397ece 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/ErrorMessages.java
@@ -19,12 +19,10 @@ package org.apache.seatunnel.connectors.doris.util;
public abstract class ErrorMessages {
public static final String PARSE_NUMBER_FAILED_MESSAGE =
- "Parse '{}' to number failed. Original string is '{}'.";
- public static final String PARSE_BOOL_FAILED_MESSAGE =
- "Parse '{}' to boolean failed. Original string is '{}'.";
+ "Parse '%s' to number failed. Original string is '%s'.";
public static final String CONNECT_FAILED_MESSAGE = "Connect to doris {}
failed.";
public static final String ILLEGAL_ARGUMENT_MESSAGE =
- "argument '{}' is illegal, value is '{}'.";
+ "argument '%s' is illegal, value is '%s'.";
public static final String SHOULD_NOT_HAPPEN_MESSAGE = "Should not come
here.";
public static final String DORIS_INTERNAL_FAIL_MESSAGE =
"Doris server '{}' internal failed, status is '{}', error message
is '{}'";
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 3c33c82d3b..f8d48e465b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -47,8 +47,6 @@ import static org.awaitility.Awaitility.given;
public abstract class AbstractDorisIT extends TestSuiteBase implements
TestResource {
protected GenericContainer<?> container;
-
- // use image adamlee489/doris:2.0.3 when running this test on mac
private static final String DOCKER_IMAGE = "bingquanzhao/doris:2.0.3";
protected static final String HOST = "doris_e2e";
protected static final int QUERY_PORT = 9030;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index 1fe65d9ebc..fb414aacd9 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -107,7 +107,16 @@ public class DorisIT extends AbstractDorisIT {
batchInsertData();
Container.ExecResult execResult =
container.executeJob("/doris_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
+ checkSinkData();
+ batchInsertData();
+ Container.ExecResult execResult2 =
+ container.executeJob("/doris_source_and_sink_2pc_false.conf");
+ Assertions.assertEquals(0, execResult2.getExitCode());
+ checkSinkData();
+ }
+
+ private void checkSinkData() {
try {
assertHasData(sourceDB, TABLE);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
new file mode 100644
index 0000000000..d8836ff2d5
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/resources/doris_source_and_sink_2pc_false.conf
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env{
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source{
+ Doris {
+ fenodes = "doris_e2e:8030"
+ username = root
+ password = ""
+ database = "e2e_source"
+ table = "doris_e2e_table"
+ doris.read.field =
"F_ID,F_INT,F_BIGINT,F_TINYINT,F_SMALLINT,F_DECIMAL,F_LARGEINT,F_BOOLEAN,F_DOUBLE,F_FLOAT,F_CHAR,F_VARCHAR_11,F_STRING,F_DATETIME_P,F_DATETIME,F_DATE"
+ doris.filter.query = "F_ID > 50"
+ }
+}
+
+transform {}
+
+sink{
+ Doris {
+ fenodes = "doris_e2e:8030"
+ username = root
+ password = ""
+ table.identifier = "e2e_sink.doris_e2e_table"
+ sink.enable-2pc = "false"
+ sink.label-prefix = "test_json"
+ doris.config = {
+ format="json"
+ read_json_by_line="true"
+ }
+ }
+ }
\ No newline at end of file