[ 
https://issues.apache.org/jira/browse/FLINK-21093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

godfrey he updated FLINK-21093:
-------------------------------
    Description: 
If a source table's ddl is

{code:sql}
CREATE TABLE MyTable (
  a bigint,
  b int,
  c varchar
) with (
  'connector' = 'filesystem',
  'path' = '/tmp',
  'format' = 'testcsv'
)
{code}

Its corresponding StreamExecTableSource's json representation looks like:

{code:json}
{
    "id": 1,
    "description": "TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c])",
     "class": 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan"
    "scanTableSource": {
        "identifier": {
            "catalogName": "default_catalog",
            "databaseName": "default_database",
            "tableName": "MyTable"
        },
        "catalogTable": {
            "connector": "filesystem",
            "path": "/tmp",
            "format": "testcsv",
            "schema.0.name": "a",
            "schema.0.data-type": "BIGINT",
            "schema.1.name": "b",
            "schema.1.data-type": "INT",
            "schema.2.name": "c",
            "schema.2.data-type": "VARCHAR(2147483647)"
        },
        "configuration": {}
    },
    "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
    "inputEdges": [],
    "inputs": [],
}
{code}

All properties of the catalog table will be serialized, so that the 
ScanTableSource instance can be created based on those properties from json. 


  was:
If a table's ddl is

{code:sql}
CREATE TABLE MyTable (
  a bigint,
  b int,
  c varchar
) with (
  'connector' = 'filesystem',
  'path' = '/tmp',
  'format' = 'testcsv'
)
{code}

Its corresponding StreamExecTableSource's json representation looks like:

{code:json}
{
    "id": 1,
    "description": "TableSourceScan(table=[[default_catalog, default_database, 
MyTable]], fields=[a, b, c])",
     "class": 
"org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan"
    "scanTableSource": {
        "identifier": {
            "catalogName": "default_catalog",
            "databaseName": "default_database",
            "tableName": "MyTable"
        },
        "catalogTable": {
            "connector": "filesystem",
            "path": "/tmp",
            "format": "testcsv",
            "schema.0.name": "a",
            "schema.0.data-type": "BIGINT",
            "schema.1.name": "b",
            "schema.1.data-type": "INT",
            "schema.2.name": "c",
            "schema.2.data-type": "VARCHAR(2147483647)"
        },
        "configuration": {}
    },
    "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
    "inputEdges": [],
    "inputs": [],
}
{code}

All properties of the catalog table will be serialized, so that the catalog 
table instance can be created based on those properties from json. 



> Support StreamExecTableSource json serialization/deserialization
> ----------------------------------------------------------------
>
>                 Key: FLINK-21093
>                 URL: https://issues.apache.org/jira/browse/FLINK-21093
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table SQL / Planner
>            Reporter: godfrey he
>            Assignee: godfrey he
>            Priority: Major
>             Fix For: 1.13.0
>
>
> If a source table's ddl is
> {code:sql}
> CREATE TABLE MyTable (
>   a bigint,
>   b int,
>   c varchar
> ) with (
>   'connector' = 'filesystem',
>   'path' = '/tmp',
>   'format' = 'testcsv'
> )
> {code}
> Its corresponding StreamExecTableSource's json representation looks like:
> {code:json}
> {
>     "id": 1,
>     "description": "TableSourceScan(table=[[default_catalog, 
> default_database, MyTable]], fields=[a, b, c])",
>      "class": 
> "org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecTableSourceScan"
>     "scanTableSource": {
>         "identifier": {
>             "catalogName": "default_catalog",
>             "databaseName": "default_database",
>             "tableName": "MyTable"
>         },
>         "catalogTable": {
>             "connector": "filesystem",
>             "path": "/tmp",
>             "format": "testcsv",
>             "schema.0.name": "a",
>             "schema.0.data-type": "BIGINT",
>             "schema.1.name": "b",
>             "schema.1.data-type": "INT",
>             "schema.2.name": "c",
>             "schema.2.data-type": "VARCHAR(2147483647)"
>         },
>         "configuration": {}
>     },
>     "outputType": "ROW<`a` BIGINT, `b` INT, `c` VARCHAR(2147483647)>",
>     "inputEdges": [],
>     "inputs": [],
> }
> {code}
> All properties of the catalog table will be serialized, so that the 
> ScanTableSource instance can be created based on those properties from json. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to