[ https://issues.apache.org/jira/browse/FLINK-21094?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
godfrey he updated FLINK-21094: ------------------------------- Description: Similar to StreamExecTableSourceScan, if a sink table's ddl is {code:sql} CREATE TABLE MySink ( a bigint, b int, c varchar ) with ( 'connector' = 'filesystem', 'path' = '/tmp', 'format' = 'testcsv' ) {code} Its corresponding StreamExecSink's json representation looks like: {code:json} { { "id": 2, "description": "Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])", "class": "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", "dynamicTableSink": { "identifier": { "catalogName": "default_catalog", "databaseName": "default_database", "tableName": "MySink" }, "catalogTable": { "connector": "filesystem", "format": "testcsv", "path": "/tmp", "schema.0.name": "a", "schema.0.data-type": "BIGINT", "schema.1.name": "b", "schema.1.data-type": "INT", "schema.2.name": "c", "schema.2.data-type": "VARCHAR(2147483647)" }, "configuration": {} }, "inputChangelogMode": [ "INSERT" ], "inputEdges": [ { "requiredShuffle": { "type": "UNKNOWN" }, "damBehavior": "PIPELINED", "priority": 0 } ], "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", "inputs": [ 1 ] } } {code} All properties of the catalog table will be serialized, so that the table sink instance can be created based on those properties from json. > Support StreamExecSink json serialization/deserialization > --------------------------------------------------------- > > Key: FLINK-21094 > URL: https://issues.apache.org/jira/browse/FLINK-21094 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Planner > Reporter: godfrey he > Assignee: godfrey he > Priority: Major > Fix For: 1.13.0 > > > Similar to StreamExecTableSourceScan, if a sink table's ddl is > {code:sql} > CREATE TABLE MySink ( > a bigint, > b int, > c varchar > ) with ( > 'connector' = 'filesystem', > 'path' = '/tmp', > 'format' = 'testcsv' > ) > {code} > Its corresponding StreamExecSink's json representation looks like: > {code:json} > { > { > "id": 2, > "description": "Sink(table=[default_catalog.default_database.MySink], > fields=[a, b, c])", > "class": > "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink", > "dynamicTableSink": { > "identifier": { > "catalogName": "default_catalog", > "databaseName": "default_database", > "tableName": "MySink" > }, > "catalogTable": { > "connector": "filesystem", > "format": "testcsv", > "path": "/tmp", > "schema.0.name": "a", > "schema.0.data-type": "BIGINT", > "schema.1.name": "b", > "schema.1.data-type": "INT", > "schema.2.name": "c", > "schema.2.data-type": "VARCHAR(2147483647)" > }, > "configuration": {} > }, > "inputChangelogMode": [ > "INSERT" > ], > "inputEdges": [ > { > "requiredShuffle": { > "type": "UNKNOWN" > }, > "damBehavior": "PIPELINED", > "priority": 0 > } > ], > "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>", > "inputs": [ > 1 > ] > } > } > {code} > All properties of the catalog table will be serialized, so that the table > sink instance can be created based on those properties from json. -- This message was sent by Atlassian Jira (v8.3.4#803005)