[ 
https://issues.apache.org/jira/browse/FLINK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Benchao Li reassigned FLINK-36549:
----------------------------------

    Assignee: xiaoyu

> Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON 
> results in unexpected data loss.
> -----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-36549
>                 URL: https://issues.apache.org/jira/browse/FLINK-36549
>             Project: Flink
>          Issue Type: Bug
>          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
>    Affects Versions: 1.17.2, 1.18.1, 1.19.1
>            Reporter: jiangyu
>            Assignee: xiaoyu
>            Priority: Critical
>              Labels: pull-request-available
>
> In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would 
> cause data loss if an operator chained with a format-related operator 
> encounters an exception. The reason is that in the deserialization 
> implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the 
> {{ignore-parse-errors}} parameter skips exceptions related to the format's 
> emitted data. For example, in Canal's JSON code, enabling the 
> {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}}
> {code:java}
> @Override
> public void deserialize(@Nullable byte[] message, Collector<RowData> out) 
> throws IOException {
>     if (message == null || message.length == 0) {
>         return;
>     }
>     try {
>         final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
>         if (database != null) {
>             if (!databasePattern
>                     .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
>                     .matches()) {
>                 return;
>             }
>         }
>         if (table != null) {
>             if (!tablePattern
>                     .matcher(root.get(ReadableMetadata.TABLE.key).asText())
>                     .matches()) {
>                 return;
>             }
>         }
>         final GenericRowData row = (GenericRowData) 
> jsonDeserializer.convertToRowData(root);
>         String type = row.getString(2).toString(); // "type" field
>         if (OP_INSERT.equals(type)) {
>             // "data" field is an array of row, contains inserted rows
>             ArrayData data = row.getArray(0);
>             for (int i = 0; i < data.size(); i++) {
>                 GenericRowData insert = (GenericRowData) data.getRow(i, 
> fieldCount);
>                 insert.setRowKind(RowKind.INSERT);
>                 emitRow(row, insert, out);
>             }
>         } else if (OP_UPDATE.equals(type)) {
>             // "data" field is an array of row, contains new rows
>             ArrayData data = row.getArray(0);
>             // "old" field is an array of row, contains old values
>             ArrayData old = row.getArray(1);
>             for (int i = 0; i < data.size(); i++) {
>                 // the underlying JSON deserialization schema always produce 
> GenericRowData.
>                 GenericRowData after = (GenericRowData) data.getRow(i, 
> fieldCount);
>                 GenericRowData before = (GenericRowData) old.getRow(i, 
> fieldCount);
>                 final JsonNode oldField = root.get(FIELD_OLD);
>                 for (int f = 0; f < fieldCount; f++) {
>                     if (before.isNullAt(f) && 
> oldField.findValue(fieldNames.get(f)) == null) {
>                         // fields in "old" (before) means the fields are 
> changed
>                         // fields not in "old" (before) means the fields are 
> not changed
>                         // so we just copy the not changed fields into before
>                         before.setField(f, after.getField(f));
>                     }
>                 }
>                 before.setRowKind(RowKind.UPDATE_BEFORE);
>                 after.setRowKind(RowKind.UPDATE_AFTER);
>                 emitRow(row, before, out);
>                 emitRow(row, after, out);
>             }
>         } else if (OP_DELETE.equals(type)) {
>             // "data" field is an array of row, contains deleted rows
>             ArrayData data = row.getArray(0);
>             for (int i = 0; i < data.size(); i++) {
>                 GenericRowData insert = (GenericRowData) data.getRow(i, 
> fieldCount);
>                 insert.setRowKind(RowKind.DELETE);
>                 emitRow(row, insert, out);
>             }
>         } else if (OP_CREATE.equals(type)) {
>             // "data" field is null and "type" is "CREATE" which means
>             // this is a DDL change event, and we should skip it.
>             return;
>         } else {
>             if (!ignoreParseErrors) {
>                 throw new IOException(
>                         format(
>                                 "Unknown \"type\" value \"%s\". The Canal 
> JSON message is '%s'",
>                                 type, new String(message)));
>             }
>         }
>     } catch (Throwable t) {
>         // a big try catch to protect the processing.
>         if (!ignoreParseErrors) {
>             throw new IOException(
>                     format("Corrupt Canal JSON message '%s'.", new 
> String(message)), t);
>         }
>     }
> }
> private void emitRow(
>         GenericRowData rootRow, GenericRowData physicalRow, 
> Collector<RowData> out) {
>     // shortcut in case no output projection is required
>     if (!hasMetadata) {
>         out.collect(physicalRow);
>         return;
>     }
>     final int physicalArity = physicalRow.getArity();
>     final int metadataArity = metadataConverters.length;
>     final GenericRowData producedRow =
>             new GenericRowData(physicalRow.getRowKind(), physicalArity + 
> metadataArity);
>     for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
>         producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
>     }
>     for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
>         producedRow.setField(
>                 physicalArity + metadataPos, 
> metadataConverters[metadataPos].convert(rootRow));
>     }
>     out.collect(producedRow);
> }
>  {code}
> The solution to this issue is to reimplement the deserialize(byte[] message, 
> Collector<T> out) method of the corresponding DeserializationSchema in 
> Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of 
> the deserialize(byte[] message, Collector<T> out) method in the 
> DeserializationSchema interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to