[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee SeungMin updated FLINK-36578: --------------------------------- Description: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). was: The function to convert json to string is different in the snapshot step and the binlog step. This causes the following differences. - original : \{"key": "value", "key1": "value1"} - snapshot : \{"key": "value", "key1": "value1"} - binlog : \{"key":"value","key1":"value1"} // no whitespace - code {code:java} protected Object convertJson(Column column, Field fieldDefn, Object data) { return convertValue(column, fieldDefn, data, "{}", (r) -> { if (data instanceof byte[]) { // The BinlogReader sees these JSON values as binary encoded, so we use the binlog client library's utility // to parse MySQL's internal binary representation into a JSON string, using the standard formatter. if (((byte[]) data).length == 0) { r.deliver(column.isOptional() ? null : "{}"); } else { try{ r.deliver(JsonBinary.parseAsString((byte[]) data)); } catch (IOException e) { parsingErrorHandler.error("Failed to parse and read a JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); r.deliver(column.isOptional() ? null : "{}"); } } } else if (data instanceof String){ // The SnapshotReader sees JSON values as UTF-8 encoded strings. r.deliver(data); } }); } {code} ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) So added modified JsonStringFormatter (added whitespace between key and value, and after comma to make it work the same as the snapshot step). > Fixed bug when converting json to string > ---------------------------------------- > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Reporter: Lee SeungMin > Priority: Major > Labels: pull-request-available > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > code > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. if (((byte[]) > data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ // The > SnapshotReader sees JSON values as UTF-8 encoded strings. > r.deliver(data); } }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > So added modified JsonStringFormatter (added whitespace between key and > value, and after comma to make it work the same as the snapshot step). -- This message was sent by Atlassian Jira (v8.20.10#820010)