[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee SeungMin updated FLINK-36578: --------------------------------- Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace It is already known issue, but I think it is necessary to process json in the same way in snapshot step and binlog stage. So I modified and added {{JsonStringFormatter}} to flink-cdc to handle json in the same way as the result of snapshot step. (modified code: added whitespace between key and value, and after comma to make it work the same as the snapshot step). different logic to convert json as string in snapshot and binlog step. {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace So added modified JsonStringFormatter to flink-cdc (modified code: added whitespace between key and value, and after comma to make it work the same as the snapshot step). different logic to convert json as string in snapshot and binlog step. {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > Fixed bug when converting json to string > ---------------------------------------- > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Lee SeungMin > Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > It is already known issue, but I think it is necessary to process json in the > same way in snapshot step and binlog stage. > So I modified and added {{JsonStringFormatter}} to flink-cdc to handle json > in the same way as the result of snapshot step. > (modified code: added whitespace between key and value, and after comma to > make it work the same as the snapshot step). > > different logic to convert json as string in snapshot and binlog step. > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. > if (((byte[]) data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ > // The SnapshotReader sees JSON values as UTF-8 encoded > strings. > r.deliver(data); > } > }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > -- This message was sent by Atlassian Jira (v8.20.10#820010)