[ https://issues.apache.org/jira/browse/FLINK-36578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lee SeungMin updated FLINK-36578: --------------------------------- Affects Version/s: cdc-3.2.1 cdc-3.1.1 cdc-3.2.0 cdc-3.1.0 > Fixed bug when converting json to string > ---------------------------------------- > > Key: FLINK-36578 > URL: https://issues.apache.org/jira/browse/FLINK-36578 > Project: Flink > Issue Type: Bug > Components: Flink CDC > Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1, cdc-3.2.1 > Reporter: Lee SeungMin > Priority: Major > Labels: pull-request-available > Fix For: cdc-3.3.0 > > > The function to convert json to string is different in the snapshot step and > the binlog step. > This causes the following differences. > - original : \{"key": "value", "key1": "value1"} > - snapshot : \{"key": "value", "key1": "value1"} > - binlog : \{"key":"value","key1":"value1"} // no whitespace > > It is already known issue, but I think it is necessary to process json in the > same way in snapshot step and binlog stage. > So I modified and added {{JsonStringFormatter}} to flink-cdc to handle json > in the same way as the result of snapshot step. > (modified code: added whitespace between key and value, and after comma to > make it work the same as the snapshot step). > > different logic to convert json as string in snapshot and binlog step. > {code:java} > protected Object convertJson(Column column, Field fieldDefn, Object data) > { > return convertValue(column, fieldDefn, data, "{}", (r) -> { > if (data instanceof byte[]) { > // The BinlogReader sees these JSON values as binary encoded, > so we use the binlog client library's utility > // to parse MySQL's internal binary representation into a > JSON string, using the standard formatter. > if (((byte[]) data).length == 0) { > r.deliver(column.isOptional() ? null : "{}"); > } > else { > try{ > r.deliver(JsonBinary.parseAsString((byte[]) data)); } > catch (IOException e) { > parsingErrorHandler.error("Failed to parse and read a > JSON value on '" + column + "' value " + Arrays.toString((byte[]) data), e); > r.deliver(column.isOptional() ? null : "{}"); > } > } > } > else if (data instanceof String){ > // The SnapshotReader sees JSON values as UTF-8 encoded > strings. > r.deliver(data); > } > }); > } {code} > ([https://github.com/debezium/debezium/blob/0220cbe0aa120376f1dd0386dc4322b4bbf29981/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlValueConverters.java#L357-L381]) > > -- This message was sent by Atlassian Jira (v8.20.10#820010)