Hengtai Nie created FLINK-26643:
-----------------------------------
Summary: Exception occues when use
TypeInformationKeyValueSerializationSchema to deserialize data from kafka
Key: FLINK-26643
URL: https://issues.apache.org/jira/browse/FLINK-26643
Project: Flink
Issue Type: Bug
Components: API / Core
Affects Versions: 1.14.3, 1.13.2
Reporter: Hengtai Nie
Recently , I use TypeInformationKeyValueSerializationSchema to deserialize
data from kafka, it throws java.io.EOFException .
Useage in code:
setDeserializer(KafkaRecordDeserializationSchema.of(new
TypeInformationKeyValueSerializationSchema (String.class, String.class, new
ExecutionConfig())));
Data in Kafka is like:
key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50
value(json string):{
"timestamp":"2022-3-11 17:9",
"deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ",
"eventId":"830500"
}
After trouble shooting, I think follwing API is the root cause:
org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte()
{code:java}
public int readUnsignedByte () throws IOException {
if (this.position < this.end) {
return (this.buffer[this.position++] & 0xff);
} else {
throw new EOFException();
}
}{code}
Obviously, it is wrong to throw an exception when (this.positon == this.end).
It just means finishing to read unsigned byte when (this.positon == this.end),
nothing need to do.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)