wuchong commented on a change in pull request #14464: URL: https://github.com/apache/flink/pull/14464#discussion_r549592196
########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java ########## @@ -218,4 +218,168 @@ public void testKafkaDebeziumChangelogSource() throws Exception { tableResult.getJobClient().get().cancel().get(); // stop the job deleteTestTopic(topic); } + + @Test + public void testKafkaCanalChangelogSource() throws Exception { + final String topic = "changelog_canal"; + createTestTopic(topic, 1, 1); + + // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769 + Configuration tableConf = tEnv.getConfig().getConfiguration(); + tableConf.setString("table.exec.mini-batch.enabled", "true"); + tableConf.setString("table.exec.mini-batch.allow-latency", "1s"); + tableConf.setString("table.exec.mini-batch.size", "5000"); + tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); + + // ---------- Write the Canal json into Kafka ------------------- + List<String> lines = readLines("canal-data.txt"); + DataStreamSource<String> stream = env.fromCollection(lines); + SerializationSchema<String> serSchema = new SimpleStringSchema(); + FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); + + // the producer must not produce duplicates + Properties producerProperties = + FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); + producerProperties.setProperty("retries", "0"); + producerProperties.putAll(secureProps); + kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProperties, partitioner); + try { + env.execute("Write sequence"); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + + // ---------- Produce an event time stream into Kafka ------------------- + String bootstraps = standardProps.getProperty("bootstrap.servers"); + String sourceDDL = + String.format( + "CREATE TABLE canal_source (" + + + // test format metadata + " origin_database STRING METADATA FROM 'value.database' VIRTUAL," + + " origin_table STRING METADATA FROM 'value.table' VIRTUAL," + + " origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL," + + " origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL," + + " origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL," + + " id INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + + // test connector metadata + " origin_topic STRING METADATA FROM 'topic' VIRTUAL," + + " origin_partition STRING METADATA FROM 'partition' VIRTUAL" + + // unused + ") WITH (" + + " 'connector' = 'kafka'," + + " 'topic' = '%s'," + + " 'properties.bootstrap.servers' = '%s'," + + " 'scan.startup.mode' = 'earliest-offset'," + + " 'value.format' = 'canal-json'" + + ")", + topic, bootstraps); + String sinkDDL = + "CREATE TABLE sink (" + + " origin_topic STRING," + + " origin_database STRING," + + " origin_table STRING," + + " origin_sql_type MAP<STRING, INT>," + + " origin_pk_names ARRAY<STRING>," + + " origin_ts TIMESTAMP(3)," + + " name STRING," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + TableResult tableResult = + tEnv.executeSql( + "INSERT INTO sink " + + "SELECT origin_topic, origin_database, origin_table, origin_sql_type, " + + "origin_pk_names, origin_ts, name " + + "FROM canal_source"); + + // Canal captures change data on the `products2` table: + // + // CREATE TABLE products2 ( + // id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + // name VARCHAR(255), + // description VARCHAR(512), + // weight FLOAT + // ); + // ALTER TABLE products2 AUTO_INCREMENT = 101; + // + // INSERT INTO products2 + // VALUES (default,"scooter","Small 2-wheel scooter",3.14), + // (default,"car battery","12V car battery",8.1), + // (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 + // to + // #3",0.8), + // (default,"hammer","12oz carpenter's hammer",0.75), + // (default,"hammer","14oz carpenter's hammer",0.875), + // (default,"hammer","16oz carpenter's hammer",1.0), + // (default,"rocks","box of assorted rocks",5.3), + // (default,"jacket","water resistent black wind breaker",0.1), + // (default,"spare tire","24 inch spare tire",22.2); + // UPDATE products2 SET description='18oz carpenter hammer' WHERE id=106; + // UPDATE products2 SET weight='5.1' WHERE id=107; + // INSERT INTO products2 VALUES (default,"jacket","water resistent white wind breaker",0.2); + // INSERT INTO products2 VALUES (default,"scooter","Big 2-wheel scooter ",5.18); + // UPDATE products2 SET description='new water resistent white wind breaker', weight='0.5' + // WHERE + // id=110; + // UPDATE products2 SET weight='5.17' WHERE id=111; + // DELETE FROM products2 WHERE id=111; + // UPDATE products2 SET weight='5.17' WHERE id=102 or id = 101; + // DELETE FROM products2 WHERE id=102 or id = 103; + // + // > SELECT * FROM products2; + // +-----+--------------------+---------------------------------------------------------+--------+ + // | id | name | description | + // weight + // | + // +-----+--------------------+---------------------------------------------------------+--------+ + // | 101 | scooter | Small 2-wheel scooter | + // 5.17 + // | + // | 104 | hammer | 12oz carpenter's hammer | + // 0.75 + // | + // | 105 | hammer | 14oz carpenter's hammer | + // 0.875 + // | + // | 106 | hammer | 18oz carpenter hammer | + // 1 + // | + // | 107 | rocks | box of assorted rocks | + // 5.1 + // | + // | 108 | jacket | water resistent black wind breaker | Review comment: Could you beautify the format? The same to the debezium one. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org