Ran Tao created FLINK-30935: ------------------------------- Summary: Add KafkaSerializer deserialize check when using SimpleVersionedSerializer Key: FLINK-30935 URL: https://issues.apache.org/jira/browse/FLINK-30935 Project: Flink Issue Type: Improvement Components: Connectors / Kafka Affects Versions: 1.16.1 Reporter: Ran Tao
{code:java} @Override public int getVersion() { return CURRENT_VERSION; } @Override public KafkaPartitionSplit deserialize(int version, byte[] serialized) throws IOException { try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized); DataInputStream in = new DataInputStream(bais)) { String topic = in.readUTF(); int partition = in.readInt(); long offset = in.readLong(); long stoppingOffset = in.readLong(); return new KafkaPartitionSplit( new TopicPartition(topic, partition), offset, stoppingOffset); } } {code} Current kafka serializers do not deal with version check. I think we can add it like many other connectors in case of incompatible or corrupt state. -- This message was sent by Atlassian Jira (v8.20.10#820010)