WeichengJi created FLINK-33794:
----------------------------------

             Summary: After Flink Avro deserialization fails, subsequent 
correct data cannot be deserialized correctly
                 Key: FLINK-33794
                 URL: https://issues.apache.org/jira/browse/FLINK-33794
             Project: Flink
          Issue Type: Improvement
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.18.0, 1.17.0
         Environment: Flink 1.17.1 & Flink 1.18.0
            Reporter: WeichengJi


Excuse me, this is my first time submitting Flink Jira.

I found that when using the official AvroDeserializationScheme for avro 
deserialization, when deserialization fails, the next correct data cannot be 
deserialized correctly because the previous incorrect data caused the 
inputStream in the decoder to not be fully read and closed. I think we should 
modify the deserialize method to the following code block。

The version of Flink I am using is 1.17.1
{code:java}
/** Avro decoder that decodes binary data. */
private transient BinaryDecoder decoder;

@Override
public T deserialize(@Nullable byte[] message) throws IOException {
    if (message == null) {
        return null;
    }
    // read record
    checkAvroInitialized();
    checkAvroDecoder();
    inputStream.setBuffer(message);
    Schema readerSchema = getReaderSchema();
    GenericDatumReader<T> datumReader = getDatumReader();

    datumReader.setSchema(readerSchema);

    return datumReader.read(null, decoder);
}

void checkAvroDecoder() throws IOException {
    if (!decoder.isEnd()) {
        this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
    }
} {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to