yuxiqian commented on code in PR #3535:
URL: https://github.com/apache/flink-cdc/pull/3535#discussion_r1714667705


##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml:
##########
@@ -113,6 +113,12 @@ limitations under the License.
             <version>${testcontainers.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>1.19.7</version>

Review Comment:
   Use `${testcontainers.version}`?



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateElasticsearchContent("products");
+    }
+
+    private void validateElasticsearchContent(String tableName) throws 
Exception {
+        String indexName = String.format("%s.%s", 
mysqlInventoryDatabase.getDatabaseName(), tableName);
+        int hostPort = elasticsearch.getMappedPort(9200);
+        String url = String.format("http://localhost:%d/%s/_search?pretty";, 
hostPort, indexName);
+        String curlCommand = String.format("curl -X GET '%s'", url);
+
+        ProcessBuilder processBuilder = new ProcessBuilder("curl", "-X", 
"GET", url);
+        Process process = processBuilder.start();
+        int exitCode = process.waitFor();
+
+        if (exitCode != 0) {
+            throw new RuntimeException("Failed to query index.");
+        }
+
+        Scanner scanner = new 
Scanner(process.getInputStream()).useDelimiter("\\A");
+        String response = scanner.hasNext() ? scanner.next() : "";
+        LOG.info("Elasticsearch query response for table {}: \n{}", tableName, 
response);
+
+        List<String> expectedDocuments;
+        if (tableName.equals("customers")) {
+            expectedDocuments = Arrays.asList("user_1", "user_2", "user_3", 
"user_4");
+        } else if (tableName.equals("products")) {
+            expectedDocuments = Arrays.asList(
+                    "scooter",
+                    "car battery",
+                    "12-pack drill bits",
+                    "hammer",
+                    "rocks",
+                    "jacket",
+                    "spare tire"
+            );
+        } else {
+            throw new RuntimeException("Unexpected table name: " + tableName);
+        }
+
+        for (String expectedDoc : expectedDocuments) {
+            assertTrue("Document not found: " + expectedDoc, 
response.contains(expectedDoc));
+        }
+    }
+
+    private void dropElasticsearchIndex(String tableName) {
+        try {
+            String indexName = String.format("mysql_inventory.%s", tableName);
+            int hostPort = elasticsearch.getMappedPort(9200);
+            String url = String.format("http://localhost:%d/%s";, hostPort, 
indexName);
+            String curlCommand = String.format("curl -X DELETE %s", url);
+            LOG.info("Drop Index CURL Command: {}", curlCommand);
+
+            ProcessBuilder processBuilder = new ProcessBuilder("curl", "-X", 
"DELETE", url);

Review Comment:
   Ditto



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);

Review Comment:
   Would prefer putting retry and timeout strategies in 
`validateElasticsearchContent` (just like `waitUntilSpecificEvent` in other E2e 
cases) instead of sleeping for a fixed duration, which will make tests finish 
faster if results are as expected.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);

Review Comment:
   Ditto



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {

Review Comment:
   JavaDocs is required for every class.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;

Review Comment:
   Wildcard import is forbidden by Spotless style check.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;

Review Comment:
   Are these timeout settings effective? Also please define timeout limits with 
a consistent unit (seconds / milliseconds)



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateElasticsearchContent("products");
+    }
+
+    private void validateElasticsearchContent(String tableName) throws 
Exception {
+        String indexName = String.format("%s.%s", 
mysqlInventoryDatabase.getDatabaseName(), tableName);
+        int hostPort = elasticsearch.getMappedPort(9200);
+        String url = String.format("http://localhost:%d/%s/_search?pretty";, 
hostPort, indexName);

Review Comment:
   `ElasticsearchContainer#getHttpHostAddress` could be used here



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateElasticsearchContent("products");
+    }
+
+    private void validateElasticsearchContent(String tableName) throws 
Exception {
+        String indexName = String.format("%s.%s", 
mysqlInventoryDatabase.getDatabaseName(), tableName);
+        int hostPort = elasticsearch.getMappedPort(9200);
+        String url = String.format("http://localhost:%d/%s/_search?pretty";, 
hostPort, indexName);

Review Comment:
   `getHttpHostAddress` could be used here



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateElasticsearchContent("products");
+    }
+
+    private void validateElasticsearchContent(String tableName) throws 
Exception {
+        String indexName = String.format("%s.%s", 
mysqlInventoryDatabase.getDatabaseName(), tableName);
+        int hostPort = elasticsearch.getMappedPort(9200);
+        String url = String.format("http://localhost:%d/%s/_search?pretty";, 
hostPort, indexName);
+        String curlCommand = String.format("curl -X GET '%s'", url);
+
+        ProcessBuilder processBuilder = new ProcessBuilder("curl", "-X", 
"GET", url);
+        Process process = processBuilder.start();
+        int exitCode = process.waitFor();
+
+        if (exitCode != 0) {
+            throw new RuntimeException("Failed to query index.");
+        }
+
+        Scanner scanner = new 
Scanner(process.getInputStream()).useDelimiter("\\A");
+        String response = scanner.hasNext() ? scanner.next() : "";
+        LOG.info("Elasticsearch query response for table {}: \n{}", tableName, 
response);
+
+        List<String> expectedDocuments;
+        if (tableName.equals("customers")) {
+            expectedDocuments = Arrays.asList("user_1", "user_2", "user_3", 
"user_4");
+        } else if (tableName.equals("products")) {
+            expectedDocuments = Arrays.asList(
+                    "scooter",
+                    "car battery",
+                    "12-pack drill bits",
+                    "hammer",
+                    "rocks",
+                    "jacket",
+                    "spare tire"
+            );
+        } else {
+            throw new RuntimeException("Unexpected table name: " + tableName);
+        }
+
+        for (String expectedDoc : expectedDocuments) {
+            assertTrue("Document not found: " + expectedDoc, 
response.contains(expectedDoc));
+        }
+    }
+
+    private void dropElasticsearchIndex(String tableName) {
+        try {
+            String indexName = String.format("mysql_inventory.%s", tableName);
+            int hostPort = elasticsearch.getMappedPort(9200);
+            String url = String.format("http://localhost:%d/%s";, hostPort, 
indexName);

Review Comment:
   Ditto



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########


Review Comment:
   License headers are missing.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java:
##########
@@ -0,0 +1,231 @@
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.junit.*;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Scanner;
+import java.util.stream.Stream;
+
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class);
+
+    protected static final String MYSQL_TEST_USER = "mysqluser";
+    protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+    protected static final String MYSQL_DRIVER_CLASS = 
"com.mysql.cj.jdbc.Driver";
+    public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240;
+    public static final int TESTCASE_TIMEOUT_SECONDS = 60;
+    protected static final long EVENT_DEFAULT_TIMEOUT = 60000L;
+
+    @ClassRule
+    public static final MySqlContainer MYSQL =
+            (MySqlContainer)
+                    new MySqlContainer(MySqlVersion.V8_0)
+                            .withConfigurationOverride("docker/mysql/my.cnf")
+                            .withSetupSQL("docker/mysql/setup.sql")
+                            .withDatabaseName("flink-test")
+                            .withUsername("flinkuser")
+                            .withPassword("flinkpw")
+                            .withNetwork(NETWORK)
+                            .withNetworkAliases("mysql");
+
+    private ElasticsearchContainer elasticsearch;
+
+    protected final UniqueDatabase mysqlInventoryDatabase =
+            new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
+
+    @Before
+    public void before() throws Exception {
+        super.before();
+        mysqlInventoryDatabase.createAndInitialize();
+
+        LOG.info("Starting Elasticsearch container...");
+        elasticsearch = new 
ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + 
getElasticsearchVersion())
+                .withNetwork(NETWORK)
+                .withNetworkAliases("elasticsearch")
+                .withEnv("discovery.type", "single-node")
+                .withEnv("xpack.security.enabled", "false")
+                .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+                .withExposedPorts(9200, 9300)
+                .withLogConsumer(new Slf4jLogConsumer(LOG))
+                .waitingFor(
+                        Wait.forHttp("/_cluster/health")
+                                .forStatusCodeMatching(
+                                        response -> response == 200 || 
response == 401)
+                                .withStartupTimeout(Duration.ofMinutes(2)));
+
+        Startables.deepStart(Stream.of(elasticsearch)).join();
+        LOG.info("Elasticsearch container is started.");
+    }
+
+    @After
+    public void after() {
+        super.after();
+        mysqlInventoryDatabase.dropDatabase();
+        dropElasticsearchIndex("customers");
+        dropElasticsearchIndex("products");
+        if (elasticsearch != null) {
+            elasticsearch.stop();
+        }
+    }
+
+    @Test
+    public void testSyncWholeDatabase() throws Exception {
+        String pipelineJob = String.format(
+                "source:\n"
+                        + "  type: mysql\n"
+                        + "  hostname: mysql\n"
+                        + "  port: 3306\n"
+                        + "  username: %s\n"
+                        + "  password: %s\n"
+                        + "  tables: %s.\\.*\n"
+                        + "  server-id: 5400-5404\n"
+                        + "  server-time-zone: UTC\n"
+                        + "\n"
+                        + "sink:\n"
+                        + "  type: elasticsearch\n"
+                        + "  hosts: http://elasticsearch:9200\n";
+                        + "  version: %s\n"
+                        + "  batch.size.max.bytes: 5242880\n"
+                        + "  record.size.max.bytes: 5242880\n"
+                        + "\n"
+                        + "pipeline:\n"
+                        + "  parallelism: 1",
+                MYSQL_TEST_USER,
+                MYSQL_TEST_PASSWORD,
+                mysqlInventoryDatabase.getDatabaseName(),
+                getElasticsearchVersion().split("\\.")[0]  // Use major 
version number
+        );
+
+        Path mysqlCdcJar = 
TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+        Path elasticsearchCdcConnector = 
TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar");
+        Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+
+        submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, 
mysqlDriverJar);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+        LOG.info("Pipeline job is running");
+
+        // Wait for data to be written
+        Thread.sleep(20000);
+        validateElasticsearchContent("customers");
+        validateElasticsearchContent("products");
+
+        LOG.info("Begin incremental reading stage.");
+        String mysqlJdbcUrl = String.format(
+                "jdbc:mysql://%s:%s/%s",
+                MYSQL.getHost(),
+                MYSQL.getDatabasePort(),
+                mysqlInventoryDatabase.getDatabaseName()
+        );
+
+        try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, 
MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+             Statement stat = conn.createStatement()) {
+
+            stat.execute("INSERT INTO products VALUES (default,'jacket','water 
resistent white wind breaker',0.2, null, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+            validateElasticsearchContent("products");
+
+            stat.execute("UPDATE products SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+            stat.execute("ALTER TABLE products DROP COLUMN point_c;");
+            stat.execute("DELETE FROM products WHERE id=101;");
+
+            stat.execute("INSERT INTO products VALUES (default,'scooter', 
null, 2.14, null, null);");
+
+            // Wait for data to be written
+            Thread.sleep(20000);
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        validateElasticsearchContent("products");
+    }
+
+    private void validateElasticsearchContent(String tableName) throws 
Exception {
+        String indexName = String.format("%s.%s", 
mysqlInventoryDatabase.getDatabaseName(), tableName);
+        int hostPort = elasticsearch.getMappedPort(9200);
+        String url = String.format("http://localhost:%d/%s/_search?pretty";, 
hostPort, indexName);
+        String curlCommand = String.format("curl -X GET '%s'", url);
+
+        ProcessBuilder processBuilder = new ProcessBuilder("curl", "-X", 
"GET", url);
+        Process process = processBuilder.start();
+        int exitCode = process.waitFor();

Review Comment:
   `curl` is not generally available on all platforms (like Windows), even some 
Linux distro doesn't package it by default. Please consider using Java HTTP 
libraries to send request instead.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -60,16 +60,16 @@
 
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Test environment running pipeline job on Flink containers. */

Review Comment:
   Please do not remove JavaDocs.



##########
flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java:
##########
@@ -60,16 +60,16 @@
 
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Test environment running pipeline job on Flink containers. */
 @RunWith(Parameterized.class)
 public abstract class PipelineTestEnvironment extends TestLogger {
     private static final Logger LOG = 
LoggerFactory.getLogger(PipelineTestEnvironment.class);
 
-    @Parameterized.Parameter public String flinkVersion;
+    @Parameterized.Parameter(0)
+    public String flinkVersion;
+
+    @Parameterized.Parameter(1)
+    public String elasticsearchVersion;

Review Comment:
   Putting Elasticsearch specific things in `PipelineTestEnvironment` seems not 
a good idea. Other parameterized E2e cases that has nothing to do with ES will 
be executed repeatedly, which doesn't make sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to