[ https://issues.apache.org/jira/browse/FLINK-21172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated FLINK-21172: ----------------------------------- Labels: pull-request-available (was: ) > canal-json format include es field > ---------------------------------- > > Key: FLINK-21172 > URL: https://issues.apache.org/jira/browse/FLINK-21172 > Project: Flink > Issue Type: Improvement > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile), Table > SQL / Ecosystem > Affects Versions: 1.12.0, 1.12.1 > Reporter: jiabao sun > Assignee: Nicholas Jiang > Priority: Minor > Labels: pull-request-available > > Canal flat message json format has an 'es' field extracted from mysql binlog > which means the row data real change time in mysql. It expressed the event > time naturally but was ignored during deserialization. > {code:json} > { > "data": [ > { > "id": "111", > "name": "scooter", > "description": "Big 2-wheel scooter", > "weight": "5.18" > } > ], > "database": "inventory", > "es": 1589373560000, > "id": 9, > "isDdl": false, > "mysqlType": { > "id": "INTEGER", > "name": "VARCHAR(255)", > "description": "VARCHAR(512)", > "weight": "FLOAT" > }, > "old": [ > { > "weight": "5.15" > } > ], > "pkNames": [ > "id" > ], > "sql": "", > "sqlType": { > "id": 4, > "name": 12, > "description": 12, > "weight": 7 > }, > "table": "products", > "ts": 1589373560798, > "type": "UPDATE" > } > {code} > org.apache.flink.formats.json.canal. CanalJsonDeserializationSchema > {code:java} > private static RowType createJsonRowType(DataType databaseSchema) { > // Canal JSON contains other information, e.g. "ts", "sql", but we > don't need them > return (RowType) > DataTypes.ROW( > DataTypes.FIELD("data", > DataTypes.ARRAY(databaseSchema)), > DataTypes.FIELD("old", > DataTypes.ARRAY(databaseSchema)), > DataTypes.FIELD("type", DataTypes.STRING()), > DataTypes.FIELD("database", > DataTypes.STRING()), > DataTypes.FIELD("table", DataTypes.STRING())) > .getLogicalType(); > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)