[ https://issues.apache.org/jira/browse/FLINK-36549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Benchao Li reassigned FLINK-36549: ---------------------------------- Assignee: xiaoyu > Using the ignore-parse-errors parameter in Debezium/Canal/Maxwell/Ogg JSON > results in unexpected data loss. > ----------------------------------------------------------------------------------------------------------- > > Key: FLINK-36549 > URL: https://issues.apache.org/jira/browse/FLINK-36549 > Project: Flink > Issue Type: Bug > Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) > Affects Versions: 1.17.2, 1.18.1, 1.19.1 > Reporter: jiangyu > Assignee: xiaoyu > Priority: Critical > Labels: pull-request-available > > In Debezium/Canal/Maxwell/Ogg JSON, setting {{ignore-parse-errors}} would > cause data loss if an operator chained with a format-related operator > encounters an exception. The reason is that in the deserialization > implementation of Debezium/Canal/Maxwell/Ogg JSON, enabling the > {{ignore-parse-errors}} parameter skips exceptions related to the format's > emitted data. For example, in Canal's JSON code, enabling the > {{ignore-parse-errors}} parameter catches and skips exceptions for {{emitRow}} > {code:java} > @Override > public void deserialize(@Nullable byte[] message, Collector<RowData> out) > throws IOException { > if (message == null || message.length == 0) { > return; > } > try { > final JsonNode root = jsonDeserializer.deserializeToJsonNode(message); > if (database != null) { > if (!databasePattern > .matcher(root.get(ReadableMetadata.DATABASE.key).asText()) > .matches()) { > return; > } > } > if (table != null) { > if (!tablePattern > .matcher(root.get(ReadableMetadata.TABLE.key).asText()) > .matches()) { > return; > } > } > final GenericRowData row = (GenericRowData) > jsonDeserializer.convertToRowData(root); > String type = row.getString(2).toString(); // "type" field > if (OP_INSERT.equals(type)) { > // "data" field is an array of row, contains inserted rows > ArrayData data = row.getArray(0); > for (int i = 0; i < data.size(); i++) { > GenericRowData insert = (GenericRowData) data.getRow(i, > fieldCount); > insert.setRowKind(RowKind.INSERT); > emitRow(row, insert, out); > } > } else if (OP_UPDATE.equals(type)) { > // "data" field is an array of row, contains new rows > ArrayData data = row.getArray(0); > // "old" field is an array of row, contains old values > ArrayData old = row.getArray(1); > for (int i = 0; i < data.size(); i++) { > // the underlying JSON deserialization schema always produce > GenericRowData. > GenericRowData after = (GenericRowData) data.getRow(i, > fieldCount); > GenericRowData before = (GenericRowData) old.getRow(i, > fieldCount); > final JsonNode oldField = root.get(FIELD_OLD); > for (int f = 0; f < fieldCount; f++) { > if (before.isNullAt(f) && > oldField.findValue(fieldNames.get(f)) == null) { > // fields in "old" (before) means the fields are > changed > // fields not in "old" (before) means the fields are > not changed > // so we just copy the not changed fields into before > before.setField(f, after.getField(f)); > } > } > before.setRowKind(RowKind.UPDATE_BEFORE); > after.setRowKind(RowKind.UPDATE_AFTER); > emitRow(row, before, out); > emitRow(row, after, out); > } > } else if (OP_DELETE.equals(type)) { > // "data" field is an array of row, contains deleted rows > ArrayData data = row.getArray(0); > for (int i = 0; i < data.size(); i++) { > GenericRowData insert = (GenericRowData) data.getRow(i, > fieldCount); > insert.setRowKind(RowKind.DELETE); > emitRow(row, insert, out); > } > } else if (OP_CREATE.equals(type)) { > // "data" field is null and "type" is "CREATE" which means > // this is a DDL change event, and we should skip it. > return; > } else { > if (!ignoreParseErrors) { > throw new IOException( > format( > "Unknown \"type\" value \"%s\". The Canal > JSON message is '%s'", > type, new String(message))); > } > } > } catch (Throwable t) { > // a big try catch to protect the processing. > if (!ignoreParseErrors) { > throw new IOException( > format("Corrupt Canal JSON message '%s'.", new > String(message)), t); > } > } > } > private void emitRow( > GenericRowData rootRow, GenericRowData physicalRow, > Collector<RowData> out) { > // shortcut in case no output projection is required > if (!hasMetadata) { > out.collect(physicalRow); > return; > } > final int physicalArity = physicalRow.getArity(); > final int metadataArity = metadataConverters.length; > final GenericRowData producedRow = > new GenericRowData(physicalRow.getRowKind(), physicalArity + > metadataArity); > for (int physicalPos = 0; physicalPos < physicalArity; physicalPos++) { > producedRow.setField(physicalPos, physicalRow.getField(physicalPos)); > } > for (int metadataPos = 0; metadataPos < metadataArity; metadataPos++) { > producedRow.setField( > physicalArity + metadataPos, > metadataConverters[metadataPos].convert(rootRow)); > } > out.collect(producedRow); > } > {code} > The solution to this issue is to reimplement the deserialize(byte[] message, > Collector<T> out) method of the corresponding DeserializationSchema in > Debezium/Canal/Maxwell/Ogg JSON by referring to the default implementation of > the deserialize(byte[] message, Collector<T> out) method in the > DeserializationSchema interface. -- This message was sent by Atlassian Jira (v8.20.10#820010)