User defined formats also sounds like an interesting extension. Best, Kurt
On Thu, Mar 5, 2020 at 3:06 PM Jark Wu <imj...@gmail.com> wrote: > Hi Lei, > > Currently, Flink SQL doesn't support to register a binlog format (i.e. > just define "order_id" and "order_no", but the json schema has other binlog > fields). > This is exactly what we want to support in FLIP-105 [1] and FLIP-95. > > For now, if you want to consume such json data, you have to define the > full schema, e.g. "type", "timestmap", and so on... > > Btw, what Change Data Capture (CDC) tool are you using? > > Best, > Jark > > [1]: > https://docs.google.com/document/d/1onyIUUdWAHfr_Yd5nZOE7SOExBc6TiW5C4LiL5FrjtQ/edit# > > > On Thu, 5 Mar 2020 at 11:40, wangl...@geekplus.com.cn < > wangl...@geekplus.com.cn> wrote: > >> >> I want to rigister a table from mysql binlog like this: >> >> tEnv.sqlUpdate("CREATE TABLE order(\n" >> + " order_id BIGINT,\n" >> + " order_no VARCHAR,\n" >> + ") WITH (\n" >> + " 'connector.type' = 'kafka',\n" >> ........... >> + " 'update-mode' = 'append',\n" >> + " 'format.type' = 'json',\n" >> + " 'format.derive-schema' = 'true'\n" >> + ")"); >> >> using the following log format: >> >> { >> "type" : "update", >> "timestamp" : 1583373066000, >> "binlog_filename" : "mysql-bin.000453", >> "binlog_position" : 923020943, >> "database" : "wms", >> "table_name" : "t_pick_order", >> "table_id" : 131936, >> "columns" : [ { >> "id" : 1, >> "name" : "order_id", >> "column_type" : -5, >> "last_value" : 4606458, >> "value" : 4606458 >> }, { >> "id" : 2, >> "name" : "order_no", >> "column_type" : 12, >> "last_value" : "EDBMFSJ00001S2003050006628", >> "value" : "EDBMFSJ00001S2003050006628" >> }] >> } >> >> >> Surely the format.type' = 'json',\n" will not parse the result as I >> expected. >> Is there any method I can implement this? For example, using a self >> defined format class. >> >> Thanks, >> Lei >> >> ------------------------------ >> wangl...@geekplus.com.cn >> >> >>