日志中就是报这个 "type":"INIT_DDL" 不能识别呀,然后作业就退出了
在 2023-02-20 09:58:56,"Shengkai Fang" <[email protected]> 写道: >Hi. 能同时分享下复现这个 case 的sql 以及相关的报错栈吗? > >Best, >Shengkai > >casel.chen <[email protected]> 于2023年2月9日周四 12:03写道: > >> 不同云厂商的数据同步工具对于全量+增量同步mysql数据到kafka canal json格式时行为不一致 >> 有的厂商会将DDL语句同步到topic导致下游flink sql作业不能识别抛错,建议flink canal >> json格式解析时直接忽略不识别的type,例如 >> 例1: >> {"jobId":"76d140e3-2ef7-40c2-a795-af35a0fe1d61","shardId":null,"identifier":null,"eventId":"","mysqlType":null,"id":0,"es":1675908021700,"ts":1675908021700,"database":"demo","table":"oms_parcels","type":"INIT_DDL","isDdl":true,"sql":"CREATE >> TABLE `oms_parcels` ( `id` varchar(255) NOT NULL, `createdby` >> varchar(255) DEFAULT NULL, `createdat` timestamp NOT NULL DEFAULT >> CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `updatedat` timestamp NOT >> NULL DEFAULT '0000-00-00 00:00:00', `updatedby` varchar(255) DEFAULT >> NULL, `account` varchar(255) DEFAULT NULL, `batch` varchar(255) DEFAULT >> NULL, `client` varchar(255) DEFAULT NULL, `command` varchar(255) DEFAULT >> NULL, `container` varchar(255) DEFAULT NULL, `items` mediumtext, >> `trackingnumber` varchar(255) NOT NULL, `transporter` varchar(255) DEFAULT >> NULL, `weight` decimal(19,2) NOT NULL, `zipcode` varchar(255) DEFAULT >> NULL, `ld3` varchar(255) DEFAULT NULL, `destination_code` varchar(255) >> DEFAULT NULL, PRIMARY KEY (`id`,`trackingnumber`)) ENGINE=InnoDB DEFAULT >> CHARSET=utf8mb4","sqlType":null,"data":null,"old":null,"pkNames":null} >> >> >> 例2: >> { >> "action":"ALTER", >> "before":[], >> "bid":0, >> "data":[], >> "db":"db_test", >> "dbValType":{ >> "col1":"varchar(22)", >> "col2":"varchar(22)", >> "col_pk":"varchar(22)" >> }, >> "ddl":true, >> "entryType":"ROWDATA", >> "execTs":1669789188000, >> "jdbcType":{ >> "col1":12, >> "col2":12, >> "col_pk":12 >> }, >> "pks":[], >> "schema":"db_test", >> "sendTs":1669789189533, >> "sql":"alter table table_test add col2 varchar(22) null", >> "table":"table_test", >> "tableChanges":{ >> "table":{ >> "columns":[ >> { >> "jdbcType":12, // jdbc 类型。 >> "name":"col1", // 字段名称。 >> "position":0, // 字段的顺序。 >> "typeExpression":"varchar(22)", // 类型描述。 >> "typeName":"varchar" // 类型名称。 >> }, >> { >> "jdbcType":12, >> "name":"col2", >> "position":1, >> "typeExpression":"varchar(22)", >> "typeName":"varchar" >> }, >> { >> "jdbcType":12, >> "name":"col_pk", >> "position":2, >> "typeExpression":"varchar(22)", >> "typeName":"varchar" >> } >> ], >> "primaryKeyColumnNames":["col_pk"] // 主键名列表。 >> }, >> "type":"ALTER" >> } >> }
