Hi Lei,

Currently, Flink SQL doesn't support to register a binlog format (i.e. just
define "order_id" and "order_no", but the json schema has other binlog
fields).
This is exactly what we want to support in FLIP-105 [1] and FLIP-95.

For now, if you want to consume such json data, you have to define the full
schema, e.g. "type", "timestmap", and so on...

Btw, what Change Data Capture (CDC) tool are you using?

Best,
Jark

[1]:
https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit#


On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
> I want to rigister a table from mysql binlog like this:
>
> tEnv.sqlUpdate("CREATE TABLE order(\n"
>     + "    order_id BIGINT,\n"
>     + "    order_no VARCHAR,\n"
>     + ") WITH (\n"
>     + "    'connector.type' = 'kafka',\n"
>     ...........
>     + "    'update-mode' = 'append',\n"
>     + "    'format.type' = 'json',\n"
>     + "    'format.derive-schema' = 'true'\n"
>     + ")");
>
> using the following log format:
>
> {
>   "type" : "update",
>   "timestamp" : 1583373066000,
>   "binlog_filename" : "mysql-bin.000453",
>   "binlog_position" : 923020943,
>   "database" : "wms",
>   "table_name" : "t_pick_order",
>   "table_id" : 131936,
>   "columns" : [ {
>     "id" : 1,
>     "name" : "order_id",
>     "column_type" : -5,
>     "last_value" : 4606458,
>     "value" : 4606458
>   }, {
>     "id" : 2,
>     "name" : "order_no",
>     "column_type" : 12,
>     "last_value" : "EDBMFSJ00001S2003050006628",
>     "value" : "EDBMFSJ00001S2003050006628"
>   }]
> }
>
>
> Surely the format.type' = 'json',\n" will not parse the result as I
> expected.
> Is there any method I can implement this? For example, using a self
> defined format class.
>
> Thanks,
> Lei
>
> ------------------------------
> wangl...@geekplus.com.cn
>
>
>

Reply via email to