This is an automated email from the ASF dual-hosted git repository. wuchunfu pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new 3455316981 [Bugfix][Elasticsearch] Fix add column event (#9069) 3455316981 is described below commit 34553169810d52c29f3176caf9eed525a22fdcd6 Author: hailin0 <wanghai...@apache.org> AuthorDate: Mon Apr 7 15:39:04 2025 +0800 [Bugfix][Elasticsearch] Fix add column event (#9069) --- .../sink/ElasticsearchSinkWriter.java | 16 ++++++++++++- .../elasticsearch/ElasticsearchSchemaChangeIT.java | 27 +++++++++++++++++++--- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java index 46c49cc4b0..b5da93d37b 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkWriter.java @@ -23,11 +23,14 @@ import org.apache.seatunnel.api.sink.SupportMultiTableSinkWriter; import org.apache.seatunnel.api.sink.SupportSchemaEvolutionSinkWriter; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.converter.BasicTypeDefine; import org.apache.seatunnel.api.table.schema.event.AlterTableAddColumnEvent; import org.apache.seatunnel.api.table.schema.event.AlterTableColumnEvent; import org.apache.seatunnel.api.table.schema.event.AlterTableColumnsEvent; import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent; +import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventDispatcher; +import org.apache.seatunnel.api.table.schema.handler.TableSchemaChangeEventHandler; import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; @@ -66,12 +69,14 @@ public class ElasticsearchSinkWriter private final int maxBatchSize; - private final SeaTunnelRowSerializer seaTunnelRowSerializer; + private SeaTunnelRowSerializer seaTunnelRowSerializer; private final List<String> requestEsList; private EsRestClient esRestClient; private RetryMaterial retryMaterial; private static final long DEFAULT_SLEEP_TIME_MS = 200L; private final IndexInfo indexInfo; + private TableSchema tableSchema; + private final TableSchemaChangeEventHandler tableSchemaChangeEventHandler; public ElasticsearchSinkWriter( Context context, @@ -94,6 +99,8 @@ public class ElasticsearchSinkWriter this.requestEsList = new ArrayList<>(maxBatchSize); this.retryMaterial = new RetryMaterial(maxRetryCount, true, exception -> true, DEFAULT_SLEEP_TIME_MS); + this.tableSchema = catalogTable.getTableSchema(); + this.tableSchemaChangeEventHandler = new TableSchemaChangeEventDispatcher(); } @Override @@ -120,6 +127,13 @@ public class ElasticsearchSinkWriter } else { throw new UnsupportedOperationException("Unsupported alter table event: " + event); } + + this.tableSchema = tableSchemaChangeEventHandler.reset(tableSchema).apply(event); + this.seaTunnelRowSerializer = + new ElasticsearchRowSerializer( + esRestClient.getClusterInfo(), + indexInfo, + tableSchema.toPhysicalRowDataType()); } private void applySingleSchemaChangeEvent(SchemaChangeEvent event) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java index b05cbc098d..d09954fcc1 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchSchemaChangeIT.java @@ -192,9 +192,30 @@ public class ElasticsearchSchemaChangeIT extends TestSuiteBase implements TestRe this.container.execInContainer( "bash", "-c", - "curl -k -u elastic:elasticsearch https://localhost:9200/schema_change_index/_count"); - Assertions.assertTrue( - indexCountResult.getStdout().contains("\"count\":18")); + "curl -k -u elastic:elasticsearch -H \"Content-Type:application/json\" -d '{ \"from\": 0, \"size\": 10000, \"query\": { \"match_all\": {}}}' https://localhost:9200/schema_change_index/_search"); + log.info("indexCountResult: {}", indexCountResult.getStdout()); + ObjectNode jsonNode = + JsonUtils.parseObject(indexCountResult.getStdout()); + JsonNode hits = jsonNode.get("hits"); + long totalCount = hits.get("total").get("value").asLong(); + Assertions.assertEquals(18L, totalCount); + + hits.get("hits") + .forEach( + hit -> { + JsonNode source = hit.get("_source"); + int id = source.get("id").asInt(); + if (id >= 119 && id <= 127) { + Assertions.assertTrue( + source.has("add_column1")); + Assertions.assertFalse( + source.get("add_column1").isNull()); + Assertions.assertTrue( + source.has("add_column2")); + Assertions.assertFalse( + source.get("add_column2").isNull()); + } + }); }); }