This is an automated email from the ASF dual-hosted git repository. wanghailin pushed a commit to branch dev in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push: new bb9fd516ec [Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA (#7124) bb9fd516ec is described below commit bb9fd516ec27fbdf2da49a327fafcb52a8250afd Author: Wudadada <40282570+wudad...@users.noreply.github.com> AuthorDate: Tue Jul 9 21:49:28 2024 +0800 [Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA (#7124) --- .../catalog/ElasticSearchCatalog.java | 3 +- .../elasticsearch/client/EsRestClient.java | 29 +++++++++ .../exception/ElasticsearchConnectorErrorCode.java | 3 +- .../connector/elasticsearch/ElasticsearchIT.java | 69 ++++++++++++++++++---- 4 files changed, 88 insertions(+), 16 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java index b1eb60e289..bbf594eb10 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java @@ -217,8 +217,7 @@ public class ElasticSearchCatalog implements Catalog { @Override public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) { - dropTable(tablePath, ignoreIfNotExists); - createTable(tablePath, null, ignoreIfNotExists); + esRestClient.clearIndexData(tablePath.getTableName()); } @Override diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java index 18c9b7c109..f80f20f673 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java @@ -480,6 +480,35 @@ public class EsRestClient { } } + public void clearIndexData(String indexName) { + String endpoint = String.format("/%s/_delete_by_query", indexName); + Request request = new Request("POST", endpoint); + String jsonString = "{ \"query\": { \"match_all\": {} } }"; + request.setJsonEntity(jsonString); + + try { + Response response = restClient.performRequest(request); + if (response == null) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, + "POST " + endpoint + " response null"); + } + // todo: if the index doesn't exist, the response status code is 200? + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + return; + } else { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, + String.format( + "POST %s response status code=%d", + endpoint, response.getStatusLine().getStatusCode())); + } + } catch (IOException ex) { + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, ex); + } + } + /** * get es field name and type mapping realtion * diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java index 67f01201dd..fe182868d4 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java @@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements SeaTunnelErrorCode { LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"), DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"), CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index failed"), - ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"); + ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the elasticsearch field type"), + CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index data failed"); ; private final String code; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java index 3180f386b2..623dd9d221 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java @@ -447,35 +447,78 @@ public class ElasticsearchIT extends TestSuiteBase implements TestResource { } @Test - public void testCatalog() { + public void testCatalog() throws InterruptedException, JsonProcessingException { Map<String, Object> configMap = new HashMap<>(); configMap.put("username", "elastic"); configMap.put("password", "elasticsearch"); - configMap.put("hosts", Arrays.asList("https://" + container.getHttpHostAddress())); + configMap.put( + "hosts", Collections.singletonList("https://" + container.getHttpHostAddress())); configMap.put("index", "st_index3"); configMap.put("tls_verify_certificate", false); configMap.put("tls_verify_hostname", false); configMap.put("index_type", "st"); + final ElasticSearchCatalog elasticSearchCatalog = new ElasticSearchCatalog("Elasticsearch", "", ReadonlyConfig.fromMap(configMap)); elasticSearchCatalog.open(); + TablePath tablePath = TablePath.of("", "st_index3"); - // index exists + + // Verify index does not exist initially final boolean existsBefore = elasticSearchCatalog.tableExists(tablePath); - Assertions.assertFalse(existsBefore); - // create index + Assertions.assertFalse(existsBefore, "Index should not exist initially"); + + // Create index elasticSearchCatalog.createTable(tablePath, null, false); final boolean existsAfter = elasticSearchCatalog.tableExists(tablePath); - Assertions.assertTrue(existsAfter); - // data exists? - final boolean existsData = elasticSearchCatalog.isExistsData(tablePath); - Assertions.assertFalse(existsData); - // truncate + Assertions.assertTrue(existsAfter, "Index should be created"); + + // Generate and add multiple records + List<String> data = generateTestData(); + StringBuilder requestBody = new StringBuilder(); + String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n"; + for (String record : data) { + requestBody.append(indexHeader); + requestBody.append(record); + requestBody.append("\n"); + } + esRestClient.bulk(requestBody.toString()); + Thread.sleep(2000); // Wait for data to be indexed + + // Verify data exists + List<String> sourceFields = Arrays.asList("field1", "field2"); + Map<String, Object> query = new HashMap<>(); + query.put("match_all", new HashMap<>()); + ScrollResult scrollResult = + esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should exist in the index"); + + // Truncate the table elasticSearchCatalog.truncateTable(tablePath, false); - Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath)); - // drop + Thread.sleep(2000); // Wait for data to be indexed + + // Verify data is deleted + scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, query, "1m", 100); + Assertions.assertTrue( + scrollResult.getDocs().isEmpty(), "Data should be deleted from the index"); + + // Drop the table elasticSearchCatalog.dropTable(tablePath, false); - Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath)); + Assertions.assertFalse( + elasticSearchCatalog.tableExists(tablePath), "Index should be dropped"); + elasticSearchCatalog.close(); } + + private List<String> generateTestData() throws JsonProcessingException { + List<String> data = new ArrayList<>(); + ObjectMapper objectMapper = new ObjectMapper(); + for (int i = 0; i < 10; i++) { + Map<String, Object> record = new HashMap<>(); + record.put("field1", "value" + i); + record.put("field2", i); + data.add(objectMapper.writeValueAsString(record)); + } + return data; + } }