[ 
https://issues.apache.org/jira/browse/FLINK-18758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17171297#comment-17171297
 ] 

Marta Paes Moreira edited comment on FLINK-18758 at 8/5/20, 6:57 AM:
---------------------------------------------------------------------

+1, thanks for documenting this issue.

I've come across this while setting up a demo with Flink SQL+Debezium (with 
Postgres). In addition to making it hard to work with timestamps, it also 
prevents users from using the JDBC catalogs to create the changelog table 
(since the types diverge between the Postgres table and Kafka; I couldn't make 
this work with computed columns using ts_field AS CAST(field AS TIMESTAMP)).

NUMERIC/DECIMAL columns were also an issue, but AFAIU this is tied to Kafka 
itself. As a workaround, I refrained from using DECIMAL and set 
"numeric.mapping": "best_fit" in the Debezium connector properties.


was (Author: morsapaes):
+1, thanks for documenting this issue.

I've come across this while setting up a demo with Flink SQL+Debezium (with 
Postgres). In addition to making it hard to work with timestamps, it also 
prevents users from using the JDBC catalogs to create the changelog table 
(since the types diverge between the Postgres table and Kafka; I couldn't make 
this work with computed columns using a ts_field AS CAST(field AS TIMESTAMP)).

NUMERIC/DECIMAL columns were also an issue, but AFAIU this is tied to Kafka 
itself. As a workaround, I refrained from using DECIMAL and set 
"numeric.mapping": "best_fit" in the Debezium connector properties.

> Support debezium data type in debezium format 
> ----------------------------------------------
>
>                 Key: FLINK-18758
>                 URL: https://issues.apache.org/jira/browse/FLINK-18758
>             Project: Flink
>          Issue Type: Improvement
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>            Reporter: Leonard Xu
>            Priority: Major
>
> Currently debezium json format  wrapper the json format to 
> serialize/deserialize the debezuim json format data.
> But debezium json format has its own data type which consists of Literal Type 
> and Semantic Type[1], i.g. Date type in debezium is an integer which 
> represents the number of days since epoch rather than a string with 
> 'yyyy-MM-dd' pattern.  
> {code:java}
> {   "schema":{
>       "fields":[  
>        {            
>                 "fields":[
>                {                 
>                  "type":"int32",      //Literal Type
>                   "optional":false,
>                   "name":"io.debezium.time.Date", //semantic Type
>                   "version":1,
>                   "field":"order_date"
>                },
>                {                  "type":"int32",
>                   "optional":false,
>                   "field":"quantity"
>                }
>             ]
>          }
>       ]
>    },
>    "payload":{ 
>       "before":null,
>       "after":{ 
>          "order_date":16852, // Literal Value, the number of days since epoch
>         "quantity":1
>       },
>       "op":"c",
>       "ts_ms":1596081813474
>    }
> } {code}
>  
> I think we need obtain the debezuim data type from schema information and 
> then serialize/deserialize the data in payload.
> [1][https://debezium.io/documentation/reference/1.2/connectors/mysql.html]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to