This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git
The following commit(s) were added to refs/heads/master by this push: new 248db01 [improve] Add a check for column existence when adding a new column. (#20) 248db01 is described below commit 248db0198f760417a3c7cf319518808618218aa1 Author: Petrichor <31833513+vinle...@users.noreply.github.com> AuthorDate: Tue Jun 4 11:29:50 2024 +0800 [improve] Add a check for column existence when adding a new column. (#20) --- .../connector/converter/schema/SchemaChangeManager.java | 17 ++++++++++++++++- .../kafka/connector/service/DorisSystemService.java | 9 +++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java index 376edf9..3086adb 100644 --- a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java +++ b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java @@ -19,6 +19,8 @@ package org.apache.doris.kafka.connector.converter.schema; +import static java.net.HttpURLConnection.HTTP_OK; + import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.io.Serializable; @@ -30,6 +32,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.doris.kafka.connector.cfg.DorisOptions; import org.apache.doris.kafka.connector.converter.RecordDescriptor; import org.apache.doris.kafka.connector.exception.SchemaChangeException; +import org.apache.doris.kafka.connector.service.DorisSystemService; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpPost; @@ -48,9 +51,11 @@ public class SchemaChangeManager implements Serializable { private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; private final ObjectMapper objectMapper = new ObjectMapper(); private final DorisOptions dorisOptions; + private DorisSystemService dorisSystemService; public SchemaChangeManager(DorisOptions dorisOptions) { this.dorisOptions = dorisOptions; + this.dorisSystemService = new DorisSystemService(dorisOptions); } private boolean handleSchemaChange(Map<String, Object> responseMap, String responseEntity) { @@ -64,6 +69,16 @@ public class SchemaChangeManager implements Serializable { public void addColumnDDL(String tableName, RecordDescriptor.FieldDescriptor field) { try { + // check the add column whether exist in table. + if (dorisSystemService.isColumnExist( + dorisOptions.getDatabase(), tableName, field.getName())) { + LOG.warn( + "The column {} already exists in table {}, no need to add it again", + field.getName(), + tableName); + return; + } + String addColumnDDL = buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field); boolean status = execute(addColumnDDL, dorisOptions.getDatabase()); LOG.info( @@ -146,7 +161,7 @@ public class SchemaChangeManager implements Serializable { CloseableHttpResponse response = httpclient.execute(request); final int statusCode = response.getStatusLine().getStatusCode(); final String reasonPhrase = response.getStatusLine().getReasonPhrase(); - if (statusCode == 200 && response.getEntity() != null) { + if (statusCode == HTTP_OK && response.getEntity() != null) { responseEntity = EntityUtils.toString(response.getEntity()); return objectMapper.readValue(responseEntity, Map.class); } else { diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java index 2365aa4..b627daf 100644 --- a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java +++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; public class DorisSystemService { private static final Logger LOG = LoggerFactory.getLogger(DorisSystemService.class); + private static final String GET_COLUMN_EXISTS_TEMPLATE = + "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?"; private final JdbcConnectionProvider jdbcConnectionProvider; public DorisSystemService(DorisOptions dorisOptions) { @@ -70,6 +72,13 @@ public class DorisSystemService { databaseName); } + public boolean isColumnExist(String database, String tableName, String columnName) { + List<String> columnList = + extractColumnValuesBySQL( + GET_COLUMN_EXISTS_TEMPLATE, 1, null, database, tableName, columnName); + return !columnList.isEmpty(); + } + public List<String> extractColumnValuesBySQL( String sql, int columnIndex, Predicate<String> filterFunc, Object... params) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org