This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 248db01  [improve] Add a check for column existence when adding a new 
column. (#20)
248db01 is described below

commit 248db0198f760417a3c7cf319518808618218aa1
Author: Petrichor <31833513+vinle...@users.noreply.github.com>
AuthorDate: Tue Jun 4 11:29:50 2024 +0800

    [improve] Add a check for column existence when adding a new column. (#20)
---
 .../connector/converter/schema/SchemaChangeManager.java | 17 ++++++++++++++++-
 .../kafka/connector/service/DorisSystemService.java     |  9 +++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
index 376edf9..3086adb 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/converter/schema/SchemaChangeManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.doris.kafka.connector.converter.schema;
 
+import static java.net.HttpURLConnection.HTTP_OK;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.io.Serializable;
@@ -30,6 +32,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.kafka.connector.cfg.DorisOptions;
 import org.apache.doris.kafka.connector.converter.RecordDescriptor;
 import org.apache.doris.kafka.connector.exception.SchemaChangeException;
+import org.apache.doris.kafka.connector.service.DorisSystemService;
 import org.apache.http.HttpHeaders;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpPost;
@@ -48,9 +51,11 @@ public class SchemaChangeManager implements Serializable {
     private static final String SCHEMA_CHANGE_API = 
"http://%s/api/query/default_cluster/%s";;
     private final ObjectMapper objectMapper = new ObjectMapper();
     private final DorisOptions dorisOptions;
+    private DorisSystemService dorisSystemService;
 
     public SchemaChangeManager(DorisOptions dorisOptions) {
         this.dorisOptions = dorisOptions;
+        this.dorisSystemService = new DorisSystemService(dorisOptions);
     }
 
     private boolean handleSchemaChange(Map<String, Object> responseMap, String 
responseEntity) {
@@ -64,6 +69,16 @@ public class SchemaChangeManager implements Serializable {
 
     public void addColumnDDL(String tableName, 
RecordDescriptor.FieldDescriptor field) {
         try {
+            // check the add column whether exist in table.
+            if (dorisSystemService.isColumnExist(
+                    dorisOptions.getDatabase(), tableName, field.getName())) {
+                LOG.warn(
+                        "The column {} already exists in table {}, no need to 
add it again",
+                        field.getName(),
+                        tableName);
+                return;
+            }
+
             String addColumnDDL = 
buildAddColumnDDL(dorisOptions.getDatabase(), tableName, field);
             boolean status = execute(addColumnDDL, dorisOptions.getDatabase());
             LOG.info(
@@ -146,7 +161,7 @@ public class SchemaChangeManager implements Serializable {
             CloseableHttpResponse response = httpclient.execute(request);
             final int statusCode = response.getStatusLine().getStatusCode();
             final String reasonPhrase = 
response.getStatusLine().getReasonPhrase();
-            if (statusCode == 200 && response.getEntity() != null) {
+            if (statusCode == HTTP_OK && response.getEntity() != null) {
                 responseEntity = EntityUtils.toString(response.getEntity());
                 return objectMapper.readValue(responseEntity, Map.class);
             } else {
diff --git 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
index 2365aa4..b627daf 100644
--- 
a/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
+++ 
b/src/main/java/org/apache/doris/kafka/connector/service/DorisSystemService.java
@@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory;
 public class DorisSystemService {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DorisSystemService.class);
+    private static final String GET_COLUMN_EXISTS_TEMPLATE =
+            "SELECT COLUMN_NAME FROM information_schema.`COLUMNS` WHERE 
TABLE_SCHEMA = ? AND TABLE_NAME = ? AND COLUMN_NAME = ?";
     private final JdbcConnectionProvider jdbcConnectionProvider;
 
     public DorisSystemService(DorisOptions dorisOptions) {
@@ -70,6 +72,13 @@ public class DorisSystemService {
                 databaseName);
     }
 
+    public boolean isColumnExist(String database, String tableName, String 
columnName) {
+        List<String> columnList =
+                extractColumnValuesBySQL(
+                        GET_COLUMN_EXISTS_TEMPLATE, 1, null, database, 
tableName, columnName);
+        return !columnList.isEmpty();
+    }
+
     public List<String> extractColumnValuesBySQL(
             String sql, int columnIndex, Predicate<String> filterFunc, 
Object... params) {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to