jiangyu created FLINK-36549:
-------------------------------

             Summary: Using the ignore-parse-errors parameter in 
Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss.
                 Key: FLINK-36549
                 URL: https://issues.apache.org/jira/browse/FLINK-36549
             Project: Flink
          Issue Type: Bug
            Reporter: jiangyu


In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would cause 
data loss if an operator chained with a format-related operator encounters an 
exception. The reason is that in the deserialization implementation of 
Debezium/Canal/Maxwell/Ogg JSON, enabling the {{ignore-parse-errors}} parameter 
skips exceptions related to the format's emitted data. For example, in Canal's 
JSON code, enabling the {{ignore-parse-errors}} parameter catches and skips 
exceptions for {{emitRow}}
{code:java}
@Override
public void deserialize(@Nullable byte[] message, Collector<RowData> out) 
throws IOException {
    if (message == null || message.length == 0) {
        return;
    }
    try {
        final JsonNode root = jsonDeserializer.deserializeToJsonNode(message);
        if (database != null) {
            if (!databasePattern
                    .matcher(root.get(ReadableMetadata.DATABASE.key).asText())
                    .matches()) {
                return;
            }
        }
        if (table != null) {
            if (!tablePattern
                    .matcher(root.get(ReadableMetadata.TABLE.key).asText())
                    .matches()) {
                return;
            }
        }
        final GenericRowData row = (GenericRowData) 
jsonDeserializer.convertToRowData(root);
        String type = row.getString(2).toString(); // "type" field
        if (OP_INSERT.equals(type)) {
            // "data" field is an array of row, contains inserted rows
            ArrayData data = row.getArray(0);
            for (int i = 0; i < data.size(); i++) {
                GenericRowData insert = (GenericRowData) data.getRow(i, 
fieldCount);
                insert.setRowKind(RowKind.INSERT);
                emitRow(row, insert, out);
            }
        } else if (OP_UPDATE.equals(type)) {
            // "data" field is an array of row, contains new rows
            ArrayData data = row.getArray(0);
            // "old" field is an array of row, contains old values
            ArrayData old = row.getArray(1);
            for (int i = 0; i < data.size(); i++) {
                // the underlying JSON deserialization schema always produce 
GenericRowData.
                GenericRowData after = (GenericRowData) data.getRow(i, 
fieldCount);
                GenericRowData before = (GenericRowData) old.getRow(i, 
fieldCount);
                final JsonNode oldField = root.get(FIELD_OLD);
                for (int f = 0; f < fieldCount; f++) {
                    if (before.isNullAt(f) && 
oldField.findValue(fieldNames.get(f)) == null) {
                        // fields in "old" (before) means the fields are changed
                        // fields not in "old" (before) means the fields are 
not changed
                        // so we just copy the not changed fields into before
                        before.setField(f, after.getField(f));
                    }
                }
                before.setRowKind(RowKind.UPDATE_BEFORE);
                after.setRowKind(RowKind.UPDATE_AFTER);
                emitRow(row, before, out);
                emitRow(row, after, out);
            }
        } else if (OP_DELETE.equals(type)) {
            // "data" field is an array of row, contains deleted rows
            ArrayData data = row.getArray(0);
            for (int i = 0; i < data.size(); i++) {
                GenericRowData insert = (GenericRowData) data.getRow(i, 
fieldCount);
                insert.setRowKind(RowKind.DELETE);
                emitRow(row, insert, out);
            }
        } else if (OP_CREATE.equals(type)) {
            // "data" field is null and "type" is "CREATE" which means
            // this is a DDL change event, and we should skip it.
            return;
        } else {
            if (!ignoreParseErrors) {
                throw new IOException(
                        format(
                                "Unknown \"type\" value \"%s\". The Canal JSON 
message is '%s'",
                                type, new String(message)));
            }
        }
    } catch (Throwable t) {
        // a big try catch to protect the processing.
        if (!ignoreParseErrors) {
            throw new IOException(
                    format("Corrupt Canal JSON message '%s'.", new 
String(message)), t);
        }
    }
}

private void emitRow(
        GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> 
out) {
    // shortcut in case no output projection is required
    if (!hasMetadata) {
        out.collect(physicalRow);
        return;
    }
    final int physicalArity = physicalRow.getArity();
    final int metadataArity = metadataConverters.length;
    final GenericRowData producedRow =
            new GenericRowData(physicalRow.getRowKind(), physicalArity + 
metadataArity);
    for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) {
        producedRow.setField(physicalPos, physicalRow.getField(physicalPos));
    }
    for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) {
        producedRow.setField(
                physicalArity + metadataPos, 
metadataConverters[metadataPos].convert(rootRow));
    }
    out.collect(producedRow);
}
 {code}
The solution to this issue is to reimplement the deserialize(byte[] message, 
Collector<T> out) method of the corresponding DeserializationSchema in 
Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of 
the deserialize(byte[] message, Collector<T> out) method in the 
DeserializationSchema interface.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to