jiangyu created FLINK-36549: ------------------------------- Summary: Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON results in unexpected data loss. Key: FLINK-36549 URL: https://issues.apache.org/jira/browse/FLINK-36549 Project: Flink Issue Type: Bug Reporter: jiangyu
In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would cause data loss if an operator chained with a format-related operator encounters an exception. The reason is that in the deserialization implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the {{ignore-parse-errors}} parameter skips exceptions related to the format's emitted data. For example, in Canal's JSON code, enabling the {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}} {code:java} @Override public void deserialize(@Nullable byte[] message, Collector<RowData> out) throws IOException { if (message == null || message.length == 0) { return; } try { final JsonNode root = jsonDeserializer.deserializeToJsonNode(message); if (database != null) { if (!databasePattern .matcher(root.get(ReadableMetadata.DATABASE.key).asText()) .matches()) { return; } } if (table != null) { if (!tablePattern .matcher(root.get(ReadableMetadata.TABLE.key).asText()) .matches()) { return; } } final GenericRowData row = (GenericRowData) jsonDeserializer.convertToRowData(root); String type = row.getString(2).toString(); // "type" field if (OP_INSERT.equals(type)) { // "data" field is an array of row, contains inserted rows ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount); insert.setRowKind(RowKind.INSERT); emitRow(row, insert, out); } } else if (OP_UPDATE.equals(type)) { // "data" field is an array of row, contains new rows ArrayData data = row.getArray(0); // "old" field is an array of row, contains old values ArrayData old = row.getArray(1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); final JsonNode oldField = root.get(FIELD_OLD); for (int f = 0; f < fieldCount; f++) { if (before.isNullAt(f) && oldField.findValue(fieldNames.get(f)) == null) { // fields in "old" (before) means the fields are changed // fields not in "old" (before) means the fields are not changed // so we just copy the not changed fields into before before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); emitRow(row, before, out); emitRow(row, after, out); } } else if (OP_DELETE.equals(type)) { // "data" field is an array of row, contains deleted rows ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { GenericRowData insert = (GenericRowData) data.getRow(i, fieldCount); insert.setRowKind(RowKind.DELETE); emitRow(row, insert, out); } } else if (OP_CREATE.equals(type)) { // "data" field is null and "type" is "CREATE" which means // this is a DDL change event, and we should skip it. return; } else { if (!ignoreParseErrors) { throw new IOException( format( "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { // a big try catch to protect the processing. if (!ignoreParseErrors) { throw new IOException( format("Corrupt Canal JSON message '%s'.", new String(message)), t); } } } private void emitRow( GenericRowData rootRow, GenericRowData physicalRow, Collector<RowData> out) { // shortcut in case no output projection is required if (!hasMetadata) { out.collect(physicalRow); return; } final int physicalArity = physicalRow.getArity(); final int metadataArity = metadataConverters.length; final GenericRowData producedRow = new GenericRowData(physicalRow.getRowKind(), physicalArity + metadataArity); for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) { producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); } for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { producedRow.setField( physicalArity + metadataPos, metadataConverters[metadataPos].convert(rootRow)); } out.collect(producedRow); } {code} The solution to this issue is to reimplement the deserialize(byte[] message, Collector<T> out) method of the corresponding DeserializationSchema in Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of the deserialize(byte[] message, Collector<T> out) method in the DeserializationSchema interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)