例如:
mysql表:
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,
`time` datetime NOT NULL,
`status` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
CREATE TABLE `status` (
`id` int(11) NOT NULL,
`name` varchar(255) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
kafka中数据:
// 表test 中insert事件
{"data":[{"id":"1745","name":"jindy1745","time":"2020-07-03
18:04:22","status":"0"}],"database":"ai_audio_lyric_task","es":1594968168000,"id":42,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)","time":"datetime","status":"int(11)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"time":93,"status":4},"table":"test","ts":1594968168789,"type":"INSERT"}
//表status 中的事件
{"data":[{"id":"10","name":"status"}],"database":"ai_audio_lyric_task","es":1595305259000,"id":589240,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(255)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12},"table":"status","ts":1595305259386,"type":"INSERT"}
如何由于kafka中的json动态的变化的,比如新增一个表,如何能转成应对的RowData,
感觉无法直接用JsonRowDeserializationSchema或CanalJsonDeserializationSchema来做处理。
--
Sent from: http://apache-flink.147419.n8.nabble.com/