WeichengJi created FLINK-33794: ---------------------------------- Summary: After Flink Avro deserialization fails, subsequent correct data cannot be deserialized correctly Key: FLINK-33794 URL: https://issues.apache.org/jira/browse/FLINK-33794 Project: Flink Issue Type: Improvement Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile) Affects Versions: 1.18.0, 1.17.0 Environment: Flink 1.17.1 & Flink 1.18.0 Reporter: WeichengJi
Excuse me, this is my first time submitting Flink Jira. I found that when using the official AvroDeserializationScheme for avro deserialization, when deserialization fails, the next correct data cannot be deserialized correctly because the previous incorrect data caused the inputStream in the decoder to not be fully read and closed. I think we should modify the deserialize method to the following code block。 The version of Flink I am using is 1.17.1 {code:java} /** Avro decoder that decodes binary data. */ private transient BinaryDecoder decoder; @Override public T deserialize(@Nullable byte[] message) throws IOException { if (message == null) { return null; } // read record checkAvroInitialized(); checkAvroDecoder(); inputStream.setBuffer(message); Schema readerSchema = getReaderSchema(); GenericDatumReader<T> datumReader = getDatumReader(); datumReader.setSchema(readerSchema); return datumReader.read(null, decoder); } void checkAvroDecoder() throws IOException { if (!decoder.isEnd()) { this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null); } } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)