wuchong commented on a change in pull request #14464: URL: https://github.com/apache/flink/pull/14464#discussion_r549207143
########## File path: docs/dev/table/connectors/formats/canal.md ########## @@ -142,6 +142,79 @@ SELECT * FROM topic_products; </div> </div> +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose +metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>database</code></td> + <td><code>STRING NULL</code></td> + <td>The originating database. Corresponds to the <code>database</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL</code></td> + <td>The originating database table. Corresponds to the <code>table</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>sql-type</code></td> + <td><code>MAP<STRING, INT> NULL</code></td> + <td>Map of various sql types. Corresponds to the <code>sqlType</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>pk-names</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>Array of primary key names. Corresponds to the <code>pkNames</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>ingestion-timestamp</code></td> + <td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NULL</code></td> + <td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code> + field in the Canal record.</td> + </tr> + </tbody> +</table> + +The following example shows how to access Canal metadata fields in Kafka: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KafkaTable ( + `origin_database` STRING METADATA FROM 'value.database' VIRTUAL, + `origin_table` STRING METADATA FROM 'value.table' VIRTUAL, Review comment: Could you list all the metadata columns? I think that would be helpful, esp. for the complex types. ########## File path: docs/dev/table/connectors/formats/canal.md ########## @@ -142,6 +142,79 @@ SELECT * FROM topic_products; </div> </div> +Available Metadata +------------------ + +The following format metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. + +<span class="label label-danger">Attention</span> Format metadata fields are only available if the +corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose +metadata fields for its value format. + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left" style="width: 25%">Key</th> + <th class="text-center" style="width: 40%">Data Type</th> + <th class="text-center" style="width: 40%">Description</th> + </tr> + </thead> + <tbody> + <tr> + <td><code>database</code></td> + <td><code>STRING NULL</code></td> + <td>The originating database. Corresponds to the <code>database</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>table</code></td> + <td><code>STRING NULL</code></td> + <td>The originating database table. Corresponds to the <code>table</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>sql-type</code></td> + <td><code>MAP<STRING, INT> NULL</code></td> + <td>Map of various sql types. Corresponds to the <code>sqlType</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>pk-names</code></td> + <td><code>ARRAY<STRING> NULL</code></td> + <td>Array of primary key names. Corresponds to the <code>pkNames</code> field in the + Canal record if available.</td> + </tr> + <tr> + <td><code>ingestion-timestamp</code></td> + <td><code>TIMESTAMP(3) WITH LOCAL TIME ZONE NULL</code></td> + <td>The timestamp at which the connector processed the event. Corresponds to the <code>ts</code> + field in the Canal record.</td> + </tr> + </tbody> +</table> + +The following example shows how to access Canal metadata fields in Kafka: + +<div class="codetabs" markdown="1"> +<div data-lang="SQL" markdown="1"> +{% highlight sql %} +CREATE TABLE KafkaTable ( + `origin_database` STRING METADATA FROM 'value.database' VIRTUAL, + `origin_table` STRING METADATA FROM 'value.table' VIRTUAL, + `user_id` BIGINT, + `item_id` BIGINT, + `behavior` STRING Review comment: I think we don't need to add backquotes around the column names, because they are not keywords. ########## File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java ########## @@ -198,4 +198,130 @@ public void testKafkaDebeziumChangelogSource() throws Exception { tableResult.getJobClient().get().cancel().get(); // stop the job deleteTestTopic(topic); } + + @Test + public void testKafkaCanalChangelogSource() throws Exception { + final String topic = "changelog_canal"; + createTestTopic(topic, 1, 1); + + // enables MiniBatch processing to verify MiniBatch + FLIP-95, see FLINK-18769 + Configuration tableConf = tEnv.getConfig().getConfiguration(); + tableConf.setString("table.exec.mini-batch.enabled", "true"); + tableConf.setString("table.exec.mini-batch.allow-latency", "1s"); + tableConf.setString("table.exec.mini-batch.size", "5000"); + tableConf.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); + + // ---------- Write the Canal json into Kafka ------------------- + List<String> lines = readLines("canal-data.txt"); + DataStreamSource<String> stream = env.fromCollection(lines); + SerializationSchema<String> serSchema = new SimpleStringSchema(); + FlinkKafkaPartitioner<String> partitioner = new FlinkFixedPartitioner<>(); + + // the producer must not produce duplicates + Properties producerProperties = FlinkKafkaProducerBase.getPropertiesFromBrokerList(brokerConnectionStrings); + producerProperties.setProperty("retries", "0"); + producerProperties.putAll(secureProps); + kafkaServer.produceIntoKafka(stream, topic, serSchema, producerProperties, partitioner); + try { + env.execute("Write sequence"); + } catch (Exception e) { + throw new Exception("Failed to write canal data to Kafka.", e); + } + + // ---------- Produce an event time stream into Kafka ------------------- + String bootstraps = standardProps.getProperty("bootstrap.servers"); + String sourceDDL = String.format( + "CREATE TABLE canal_source (" + + // test format metadata + " origin_ts STRING METADATA FROM 'value.ingestion-timestamp' VIRTUAL," + // unused + " origin_table STRING METADATA FROM 'value.table' VIRTUAL," + Review comment: Could you also test the Map and Array metadatas? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org