This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3455316981 [Bugfix][Elasticsearch] Fix add column event (#9069)
3455316981 is described below

commit 34553169810d52c29f3176caf9eed525a22fdcd6
Author: hailin0 <wanghai...@apache.org>
AuthorDate: Mon Apr 7 15:39:04 2025 +0800

    [Bugfix][Elasticsearch] Fix add column event (#9069)
---
 .../sink/ElasticsearchSinkWriter.java              | 16 ++++++++++++-
 .../elasticsearch/ElasticsearchSchemaChangeIT.java | 27 +++++++++++++++++++---
 2 files changed, 39 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
index 46c49cc4b0..b5da93d37b 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java
@@ -23,11 +23,14 @@ import 
org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter;
 import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent;
 import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent;
 import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher;
+import 
org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -66,12 +69,14 @@ public class ElasticsearchSinkWriter
 
     private final int maxBatchSize;
 
-    private final SeaTunnelRowSerializer seaTunnelRowSerializer;
+    private SeaTunnelRowSerializer seaTunnelRowSerializer;
     private final List<String> requestEsList;
     private EsRestClient esRestClient;
     private RetryMaterial retryMaterial;
     private static final long DEFAULT_SLEEP_TIME_MS = 200L;
     private final IndexInfo indexInfo;
+    private TableSchema tableSchema;
+    private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler;
 
     public ElasticsearchSinkWriter(
             Context context,
@@ -94,6 +99,8 @@ public class ElasticsearchSinkWriter
         this.requestEsList = new ArrayList<>(maxBatchSize);
         this.retryMaterial =
                 new RetryMaterial(maxRetryCount, true, exception -> true, 
DEFAULT_SLEEP_TIME_MS);
+        this.tableSchema = catalogTable.getTableSchema();
+        this.tableSchemaChangeEventHandler = new 
TableSchemaChangeEventDispatcher();
     }
 
     @Override
@@ -120,6 +127,13 @@ public class ElasticsearchSinkWriter
         } else {
             throw new UnsupportedOperationException("Unsupported alter table 
event: " + event);
         }
+
+        this.tableSchema = 
tableSchemaChangeEventHandler.reset(tableSchema).apply(event);
+        this.seaTunnelRowSerializer =
+                new ElasticsearchRowSerializer(
+                        esRestClient.getClusterInfo(),
+                        indexInfo,
+                        tableSchema.toPhysicalRowDataType());
     }
 
     private void applySingleSchemaChangeEvent(SchemaChangeEvent event) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
index b05cbc098d..d09954fcc1 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java
@@ -192,9 +192,30 @@ public class ElasticsearchSchemaChangeIT extends 
TestSuiteBase implements TestRe
                                     this.container.execInContainer(
                                             "bash",
                                             "-c",
-                                            "curl -k -u elastic:elasticsearch 
https://localhost:9200/schema_change_index/_count";);
-                            Assertions.assertTrue(
-                                    
indexCountResult.getStdout().contains("\"count\":18"));
+                                            "curl -k -u elastic:elasticsearch 
-H \"Content-Type:application/json\" -d '{ \"from\": 0, \"size\": 10000, 
\"query\": { \"match_all\": {}}}' 
https://localhost:9200/schema_change_index/_search";);
+                            log.info("indexCountResult: {}", 
indexCountResult.getStdout());
+                            ObjectNode jsonNode =
+                                    
JsonUtils.parseObject(indexCountResult.getStdout());
+                            JsonNode hits = jsonNode.get("hits");
+                            long totalCount = 
hits.get("total").get("value").asLong();
+                            Assertions.assertEquals(18L, totalCount);
+
+                            hits.get("hits")
+                                    .forEach(
+                                            hit -> {
+                                                JsonNode source = 
hit.get("_source");
+                                                int id = 
source.get("id").asInt();
+                                                if (id >= 119 && id <= 127) {
+                                                    Assertions.assertTrue(
+                                                            
source.has("add_column1"));
+                                                    Assertions.assertFalse(
+                                                            
source.get("add_column1").isNull());
+                                                    Assertions.assertTrue(
+                                                            
source.has("add_column2"));
+                                                    Assertions.assertFalse(
+                                                            
source.get("add_column2").isNull());
+                                                }
+                                            });
                         });
     }
 

Reply via email to