Hi Lei, Currently, Flink SQL doesn't support to register a binlog format (i.e. just define "order_id" and "order_no", but the json schema has other binlog fields). This is exactly what we want to support in FLIP-105 [1] and FLIP-95.
For now, if you want to consume such json data, you have to define the full schema, e.g. "type", "timestmap", and so on... Btw, what Change Data Capture (CDC) tool are you using? Best, Jark [1]: https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit# On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > I want to rigister a table from mysql binlog like this: > > tEnv.sqlUpdate("CREATE TABLE order(\n" > + " order_id BIGINT,\n" > + " order_no VARCHAR,\n" > + ") WITH (\n" > + " 'connector.type' = 'kafka',\n" > ........... > + " 'update-mode' = 'append',\n" > + " 'format.type' = 'json',\n" > + " 'format.derive-schema' = 'true'\n" > + ")"); > > using the following log format: > > { > "type" : "update", > "timestamp" : 1583373066000, > "binlog_filename" : "mysql-bin.000453", > "binlog_position" : 923020943, > "database" : "wms", > "table_name" : "t_pick_order", > "table_id" : 131936, > "columns" : [ { > "id" : 1, > "name" : "order_id", > "column_type" : -5, > "last_value" : 4606458, > "value" : 4606458 > }, { > "id" : 2, > "name" : "order_no", > "column_type" : 12, > "last_value" : "EDBMFSJ00001S2003050006628", > "value" : "EDBMFSJ00001S2003050006628" > }] > } > > > Surely the format.type' = 'json',\n" will not parse the result as I > expected. > Is there any method I can implement this? For example, using a self > defined format class. > > Thanks, > Lei > > ------------------------------ > wangl...@geekplus.com.cn > > >