[ https://issues.apache.org/jira/browse/FLINK-26643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hengtai Nie updated FLINK-26643: -------------------------------- Summary: Exception occurs when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka (was: Exception occues when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka) > Exception occurs when use TypeInformationKeyValueSerializationSchema to > deserialize data from kafka > --------------------------------------------------------------------------------------------------- > > Key: FLINK-26643 > URL: https://issues.apache.org/jira/browse/FLINK-26643 > Project: Flink > Issue Type: Bug > Components: API / Core > Affects Versions: 1.13.2, 1.14.3 > Reporter: Hengtai Nie > Priority: Major > > Recently , I use TypeInformationKeyValueSerializationSchema to deserialize > data from kafka, it throws java.io.EOFException . > > Useage in code: > setDeserializer(KafkaRecordDeserializationSchema.of(new > TypeInformationKeyValueSerializationSchema (String.class, String.class, new > ExecutionConfig()))); > > Data in Kafka is like: > key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 > value(json string):{ > "timestamp":"2022-3-11 17:9", > "deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ", > "eventId":"830500" > } > > After trouble shooting, I think follwing API is the root cause: > org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte() > {code:java} > public int readUnsignedByte () throws IOException { > if (this.position < this.end) { > return (this.buffer[this.position++] & 0xff); > } else { > throw new EOFException(); > } > }{code} > > Obviously, it is wrong to throw an exception when (this.positon == this.end). > It just means finishing to read unsigned byte when (this.positon == > this.end), nothing need to do. > > -- This message was sent by Atlassian Jira (v8.20.1#820001)