This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new bb9fd516ec [Fix][Connector-V2][Elasticsearch]Fix sink configuration 
for DROP_DATA (#7124)
bb9fd516ec is described below

commit bb9fd516ec27fbdf2da49a327fafcb52a8250afd
Author: Wudadada <40282570+wudad...@users.noreply.github.com>
AuthorDate: Tue Jul 9 21:49:28 2024 +0800

    [Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA 
(#7124)
---
 .../catalog/ElasticSearchCatalog.java              |  3 +-
 .../elasticsearch/client/EsRestClient.java         | 29 +++++++++
 .../exception/ElasticsearchConnectorErrorCode.java |  3 +-
 .../connector/elasticsearch/ElasticsearchIT.java   | 69 ++++++++++++++++++----
 4 files changed, 88 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
index b1eb60e289..bbf594eb10 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/catalog/ElasticSearchCatalog.java
@@ -217,8 +217,7 @@ public class ElasticSearchCatalog implements Catalog {
 
     @Override
     public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) {
-        dropTable(tablePath, ignoreIfNotExists);
-        createTable(tablePath, null, ignoreIfNotExists);
+        esRestClient.clearIndexData(tablePath.getTableName());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
index 18c9b7c109..f80f20f673 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java
@@ -480,6 +480,35 @@ public class EsRestClient {
         }
     }
 
+    public void clearIndexData(String indexName) {
+        String endpoint = String.format("/%s/_delete_by_query", indexName);
+        Request request = new Request("POST", endpoint);
+        String jsonString = "{ \"query\": { \"match_all\": {} } }";
+        request.setJsonEntity(jsonString);
+
+        try {
+            Response response = restClient.performRequest(request);
+            if (response == null) {
+                throw new ElasticsearchConnectorException(
+                        
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
+                        "POST " + endpoint + " response null");
+            }
+            // todo: if the index doesn't exist, the response status code is 
200?
+            if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+                return;
+            } else {
+                throw new ElasticsearchConnectorException(
+                        
ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED,
+                        String.format(
+                                "POST %s response status code=%d",
+                                endpoint, 
response.getStatusLine().getStatusCode()));
+            }
+        } catch (IOException ex) {
+            throw new ElasticsearchConnectorException(
+                    ElasticsearchConnectorErrorCode.CLEAR_INDEX_DATA_FAILED, 
ex);
+        }
+    }
+
     /**
      * get es field name and type mapping realtion
      *
diff --git 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
index 67f01201dd..fe182868d4 100644
--- 
a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/exception/ElasticsearchConnectorErrorCode.java
@@ -28,7 +28,8 @@ public enum ElasticsearchConnectorErrorCode implements 
SeaTunnelErrorCode {
     LIST_INDEX_FAILED("ELASTICSEARCH-05", "List elasticsearch index failed"),
     DROP_INDEX_FAILED("ELASTICSEARCH-06", "Drop elasticsearch index failed"),
     CREATE_INDEX_FAILED("ELASTICSEARCH-07", "Create elasticsearch index 
failed"),
-    ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the 
elasticsearch field type");
+    ES_FIELD_TYPE_NOT_SUPPORT("ELASTICSEARCH-08", "Not support the 
elasticsearch field type"),
+    CLEAR_INDEX_DATA_FAILED("ELASTICSEARCH-09", "Clear elasticsearch index 
data failed");
     ;
 
     private final String code;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
index 3180f386b2..623dd9d221 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/java/org/apache/seatunnel/e2e/connector/elasticsearch/ElasticsearchIT.java
@@ -447,35 +447,78 @@ public class ElasticsearchIT extends TestSuiteBase 
implements TestResource {
     }
 
     @Test
-    public void testCatalog() {
+    public void testCatalog() throws InterruptedException, 
JsonProcessingException {
         Map<String, Object> configMap = new HashMap<>();
         configMap.put("username", "elastic");
         configMap.put("password", "elasticsearch");
-        configMap.put("hosts", Arrays.asList("https://"; + 
container.getHttpHostAddress()));
+        configMap.put(
+                "hosts", Collections.singletonList("https://"; + 
container.getHttpHostAddress()));
         configMap.put("index", "st_index3");
         configMap.put("tls_verify_certificate", false);
         configMap.put("tls_verify_hostname", false);
         configMap.put("index_type", "st");
+
         final ElasticSearchCatalog elasticSearchCatalog =
                 new ElasticSearchCatalog("Elasticsearch", "", 
ReadonlyConfig.fromMap(configMap));
         elasticSearchCatalog.open();
+
         TablePath tablePath = TablePath.of("", "st_index3");
-        // index exists
+
+        // Verify index does not exist initially
         final boolean existsBefore = 
elasticSearchCatalog.tableExists(tablePath);
-        Assertions.assertFalse(existsBefore);
-        // create index
+        Assertions.assertFalse(existsBefore, "Index should not exist 
initially");
+
+        // Create index
         elasticSearchCatalog.createTable(tablePath, null, false);
         final boolean existsAfter = 
elasticSearchCatalog.tableExists(tablePath);
-        Assertions.assertTrue(existsAfter);
-        // data exists?
-        final boolean existsData = 
elasticSearchCatalog.isExistsData(tablePath);
-        Assertions.assertFalse(existsData);
-        // truncate
+        Assertions.assertTrue(existsAfter, "Index should be created");
+
+        // Generate and add multiple records
+        List<String> data = generateTestData();
+        StringBuilder requestBody = new StringBuilder();
+        String indexHeader = "{\"index\":{\"_index\":\"st_index3\"}}\n";
+        for (String record : data) {
+            requestBody.append(indexHeader);
+            requestBody.append(record);
+            requestBody.append("\n");
+        }
+        esRestClient.bulk(requestBody.toString());
+        Thread.sleep(2000); // Wait for data to be indexed
+
+        // Verify data exists
+        List<String> sourceFields = Arrays.asList("field1", "field2");
+        Map<String, Object> query = new HashMap<>();
+        query.put("match_all", new HashMap<>());
+        ScrollResult scrollResult =
+                esRestClient.searchByScroll("st_index3", sourceFields, query, 
"1m", 100);
+        Assertions.assertFalse(scrollResult.getDocs().isEmpty(), "Data should 
exist in the index");
+
+        // Truncate the table
         elasticSearchCatalog.truncateTable(tablePath, false);
-        Assertions.assertTrue(elasticSearchCatalog.tableExists(tablePath));
-        // drop
+        Thread.sleep(2000); // Wait for data to be indexed
+
+        // Verify data is deleted
+        scrollResult = esRestClient.searchByScroll("st_index3", sourceFields, 
query, "1m", 100);
+        Assertions.assertTrue(
+                scrollResult.getDocs().isEmpty(), "Data should be deleted from 
the index");
+
+        // Drop the table
         elasticSearchCatalog.dropTable(tablePath, false);
-        Assertions.assertFalse(elasticSearchCatalog.tableExists(tablePath));
+        Assertions.assertFalse(
+                elasticSearchCatalog.tableExists(tablePath), "Index should be 
dropped");
+
         elasticSearchCatalog.close();
     }
+
+    private List<String> generateTestData() throws JsonProcessingException {
+        List<String> data = new ArrayList<>();
+        ObjectMapper objectMapper = new ObjectMapper();
+        for (int i = 0; i < 10; i++) {
+            Map<String, Object> record = new HashMap<>();
+            record.put("field1", "value" + i);
+            record.put("field2", i);
+            data.add(objectMapper.writeValueAsString(record));
+        }
+        return data;
+    }
 }

Reply via email to