[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee SeungMin updated FLINK-36578: --------------------------------- Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace So added modified JsonStringFormatter to flink-cdc (modified code: added whitespace between key and value, and after comma to make it work the same as the snapshot step). different logic to convert json as string in snapshot and binlog step. {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > ---------------------------------------- > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Lee SeungMin > Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > So added modified JsonStringFormatter to flink-cdc > (modified code: added whitespace between key and value, and after comma to > make it work the same as the snapshot step). > > different logic to convert json as string in snapshot and binlog step. > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. > if (((byte[]) data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ > // The SnapshotReader sees JSON values as UTF-8 encoded > strings. > r.deliver(data); > } > }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > -- This message was sent by Atlassian Jira (v8.20.10#820010)