wuchong commented on a change in pull request #14464:
URL: https://github.com/apache/flink/pull/14464#discussion_r549592196



##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
##########
@@ -218,4 +218,168 @@ public void testKafkaDebeziumChangelogSource() throws 
Exception {
         tableResult.getJobClient().get().cancel().get(); // stop the job
         deleteTestTopic(topic);
     }
+
+    @Test
+    public void testKafkaCanalChangelogSource() throws Exception {
+        final String topic = "changelog_canal";
+        createTestTopic(topic, 1, 1);
+
+        // enables MiniBatch processing to verify MiniBatch + FLIP-95, see 
FLINK-18769
+        Configuration tableConf = tEnv.getConfig().getConfiguration();
+        tableConf.setString("table.exec.mini-batch.enabled", "true");
+        tableConf.setString("table.exec.mini-batch.allow-latency", "1s");
+        tableConf.setString("table.exec.mini-batch.size", "5000");
+        tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
+
+        // ---------- Write the Canal json into Kafka -------------------
+        List<String> lines = readLines("canal-data.txt");
+        DataStreamSource<String> stream = env.fromCollection(lines);
+        SerializationSchema<String> serSchema = new SimpleStringSchema();
+        FlinkKafkaPartitioner<String> partitioner = new 
FlinkFixedPartitioner<>();
+
+        // the producer must not produce duplicates
+        Properties producerProperties =
+                
FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings);
+        producerProperties.setProperty("retries", "0");
+        producerProperties.putAll(secureProps);
+        kafkaServer.produceIntoKafka(stream, topic, serSchema, 
producerProperties, partitioner);
+        try {
+            env.execute("Write sequence");
+        } catch (Exception e) {
+            throw new Exception("Failed to write canal data to Kafka.", e);
+        }
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String bootstraps = standardProps.getProperty("bootstrap.servers");
+        String sourceDDL =
+                String.format(
+                        "CREATE TABLE canal_source ("
+                                +
+                                // test format metadata
+                                " origin_database STRING METADATA FROM 
'value.database' VIRTUAL,"
+                                + " origin_table STRING METADATA FROM 
'value.table' VIRTUAL,"
+                                + " origin_sql_type MAP<STRING, INT> METADATA 
FROM 'value.sql-type' VIRTUAL,"
+                                + " origin_pk_names ARRAY<STRING> METADATA 
FROM 'value.pk-names' VIRTUAL,"
+                                + " origin_ts TIMESTAMP(3) METADATA FROM 
'value.ingestion-timestamp' VIRTUAL,"
+                                + " id INT NOT NULL,"
+                                + " name STRING,"
+                                + " description STRING,"
+                                + " weight DECIMAL(10,3),"
+                                +
+                                // test connector metadata
+                                " origin_topic STRING METADATA FROM 'topic' 
VIRTUAL,"
+                                + " origin_partition STRING METADATA FROM 
'partition' VIRTUAL"
+                                + // unused
+                                ") WITH ("
+                                + " 'connector' = 'kafka',"
+                                + " 'topic' = '%s',"
+                                + " 'properties.bootstrap.servers' = '%s',"
+                                + " 'scan.startup.mode' = 'earliest-offset',"
+                                + " 'value.format' = 'canal-json'"
+                                + ")",
+                        topic, bootstraps);
+        String sinkDDL =
+                "CREATE TABLE sink ("
+                        + " origin_topic STRING,"
+                        + " origin_database STRING,"
+                        + " origin_table STRING,"
+                        + " origin_sql_type MAP<STRING, INT>,"
+                        + " origin_pk_names ARRAY<STRING>,"
+                        + " origin_ts TIMESTAMP(3),"
+                        + " name STRING,"
+                        + " PRIMARY KEY (name) NOT ENFORCED"
+                        + ") WITH ("
+                        + " 'connector' = 'values',"
+                        + " 'sink-insert-only' = 'false'"
+                        + ")";
+        tEnv.executeSql(sourceDDL);
+        tEnv.executeSql(sinkDDL);
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "INSERT INTO sink "
+                                + "SELECT origin_topic, origin_database, 
origin_table, origin_sql_type, "
+                                + "origin_pk_names, origin_ts, name "
+                                + "FROM canal_source");
+
+        // Canal captures change data on the `products2` table:
+        //
+        // CREATE TABLE products2 (
+        //  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+        //  name VARCHAR(255),
+        //  description VARCHAR(512),
+        //  weight FLOAT
+        // );
+        // ALTER TABLE products2 AUTO_INCREMENT = 101;
+        //
+        // INSERT INTO products2
+        // VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+        //        (default,"car battery","12V car battery",8.1),
+        //        (default,"12-pack drill bits","12-pack of drill bits with 
sizes ranging from #40
+        // to
+        // #3",0.8),
+        //        (default,"hammer","12oz carpenter's hammer",0.75),
+        //        (default,"hammer","14oz carpenter's hammer",0.875),
+        //        (default,"hammer","16oz carpenter's hammer",1.0),
+        //        (default,"rocks","box of assorted rocks",5.3),
+        //        (default,"jacket","water resistent black wind breaker",0.1),
+        //        (default,"spare tire","24 inch spare tire",22.2);
+        // UPDATE products2 SET description='18oz carpenter hammer' WHERE 
id=106;
+        // UPDATE products2 SET weight='5.1' WHERE id=107;
+        // INSERT INTO products2 VALUES (default,"jacket","water resistent 
white wind breaker",0.2);
+        // INSERT INTO products2 VALUES (default,"scooter","Big 2-wheel 
scooter ",5.18);
+        // UPDATE products2 SET description='new water resistent white wind 
breaker', weight='0.5'
+        // WHERE
+        // id=110;
+        // UPDATE products2 SET weight='5.17' WHERE id=111;
+        // DELETE FROM products2 WHERE id=111;
+        // UPDATE products2 SET weight='5.17' WHERE id=102 or id = 101;
+        // DELETE FROM products2 WHERE id=102 or id = 103;
+        //
+        // > SELECT * FROM products2;
+        // 
+-----+--------------------+---------------------------------------------------------+--------+
+        // | id  | name               | description                            
                 |
+        // weight
+        // |
+        // 
+-----+--------------------+---------------------------------------------------------+--------+
+        // | 101 | scooter            | Small 2-wheel scooter                  
                 |
+        // 5.17
+        // |
+        // | 104 | hammer             | 12oz carpenter's hammer                
                 |
+        // 0.75
+        // |
+        // | 105 | hammer             | 14oz carpenter's hammer                
                 |
+        // 0.875
+        // |
+        // | 106 | hammer             | 18oz carpenter hammer                  
                 |
+        //   1
+        // |
+        // | 107 | rocks              | box of assorted rocks                  
                 |
+        // 5.1
+        // |
+        // | 108 | jacket             | water resistent black wind breaker     
                 |

Review comment:
       Could you beautify the format? The same to the debezium one. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to