Hengtai Nie created FLINK-26643: ----------------------------------- Summary: Exception occues when use TypeInformationKeyValueSerializationSchema to deserialize data from kafka Key: FLINK-26643 URL: https://issues.apache.org/jira/browse/FLINK-26643 Project: Flink Issue Type: Bug Components: API / Core Affects Versions: 1.14.3, 1.13.2 Reporter: Hengtai Nie
Recently , I use TypeInformationKeyValueSerializationSchema to deserialize data from kafka, it throws java.io.EOFException . Useage in code: setDeserializer(KafkaRecordDeserializationSchema.of(new TypeInformationKeyValueSerializationSchema (String.class, String.class, new ExecutionConfig()))); Data in Kafka is like: key(string):2 22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 value(json string):{ "timestamp":"2022-3-11 17:9", "deviceNumber":"22 6F F0 A8 8B 69 E 61 22 A5 E7 B5 50 ", "eventId":"830500" } After trouble shooting, I think follwing API is the root cause: org.apache.flink.core.memory.DataInputDeserializer.readUnsignedByte() {code:java} public int readUnsignedByte () throws IOException { if (this.position < this.end) { return (this.buffer[this.position++] & 0xff); } else { throw new EOFException(); } }{code} Obviously, it is wrong to throw an exception when (this.positon == this.end). It just means finishing to read unsigned byte when (this.positon == this.end), nothing need to do. -- This message was sent by Atlassian Jira (v8.20.1#820001)