yuxiqian commented on code in PR #3535: URL: https://github.com/apache/flink-cdc/pull/3535#discussion_r1714675091
########## flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToElasticsearchE2eITCase.java: ########## @@ -0,0 +1,231 @@ +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; +import org.junit.*; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.elasticsearch.ElasticsearchContainer; +import org.testcontainers.lifecycle.Startables; + +import java.nio.file.Path; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Scanner; +import java.util.stream.Stream; + +import static org.junit.Assert.assertTrue; + +@RunWith(Parameterized.class) +public class MySqlToElasticsearchE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MySqlToElasticsearchE2eITCase.class); + + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + public static final int DEFAULT_STARTUP_TIMEOUT_SECONDS = 240; + public static final int TESTCASE_TIMEOUT_SECONDS = 60; + protected static final long EVENT_DEFAULT_TIMEOUT = 60000L; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer(MySqlVersion.V8_0) + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases("mysql"); + + private ElasticsearchContainer elasticsearch; + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @Before + public void before() throws Exception { + super.before(); + mysqlInventoryDatabase.createAndInitialize(); + + LOG.info("Starting Elasticsearch container..."); + elasticsearch = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:" + getElasticsearchVersion()) + .withNetwork(NETWORK) + .withNetworkAliases("elasticsearch") + .withEnv("discovery.type", "single-node") + .withEnv("xpack.security.enabled", "false") + .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g") + .withExposedPorts(9200, 9300) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .waitingFor( + Wait.forHttp("/_cluster/health") + .forStatusCodeMatching( + response -> response == 200 || response == 401) + .withStartupTimeout(Duration.ofMinutes(2))); + + Startables.deepStart(Stream.of(elasticsearch)).join(); + LOG.info("Elasticsearch container is started."); + } + + @After + public void after() { + super.after(); + mysqlInventoryDatabase.dropDatabase(); + dropElasticsearchIndex("customers"); + dropElasticsearchIndex("products"); + if (elasticsearch != null) { + elasticsearch.stop(); + } + } + + @Test + public void testSyncWholeDatabase() throws Exception { + String pipelineJob = String.format( + "source:\n" + + " type: mysql\n" + + " hostname: mysql\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: elasticsearch\n" + + " hosts: http://elasticsearch:9200\n" + + " version: %s\n" + + " batch.size.max.bytes: 5242880\n" + + " record.size.max.bytes: 5242880\n" + + "\n" + + "pipeline:\n" + + " parallelism: 1", + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + getElasticsearchVersion().split("\\.")[0] // Use major version number + ); + + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path elasticsearchCdcConnector = TestUtils.getResource("flink-cdc-pipeline-connector-elasticsearch-3.2-SNAPSHOT.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + + submitPipelineJob(pipelineJob, mysqlCdcJar, elasticsearchCdcConnector, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + // Wait for data to be written + Thread.sleep(20000); + validateElasticsearchContent("customers"); + validateElasticsearchContent("products"); + + LOG.info("Begin incremental reading stage."); + String mysqlJdbcUrl = String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName() + ); + + try (Connection conn = DriverManager.getConnection(mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + + stat.execute("INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null);"); + + // Wait for data to be written + Thread.sleep(20000); + validateElasticsearchContent("products"); + + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + stat.execute("ALTER TABLE products DROP COLUMN point_c;"); + stat.execute("DELETE FROM products WHERE id=101;"); + + stat.execute("INSERT INTO products VALUES (default,'scooter', null, 2.14, null, null);"); + + // Wait for data to be written + Thread.sleep(20000); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + validateElasticsearchContent("products"); + } + + private void validateElasticsearchContent(String tableName) throws Exception { + String indexName = String.format("%s.%s", mysqlInventoryDatabase.getDatabaseName(), tableName); + int hostPort = elasticsearch.getMappedPort(9200); + String url = String.format("http://localhost:%d/%s/_search?pretty", hostPort, indexName); + String curlCommand = String.format("curl -X GET '%s'", url); + + ProcessBuilder processBuilder = new ProcessBuilder("curl", "-X", "GET", url); + Process process = processBuilder.start(); + int exitCode = process.waitFor(); Review Comment: `curl` is not generally available on all platforms (like Windows), even some Linux distro doesn't package it by default. Please consider using Java HTTP libraries to send requests instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org