With the legacy FlinkKafkaConsumer, overriding the isEndOfStream method of DeserializationSchema can solve the problem. But the new KafkaSource ignores the method (never been called), and it seems the setUnbounded method only accepts offset or time.
With the legacy FlinkKafkaConsumer, overriding the isEndOfStream method of DeserializationSchema can solve the problem. But the new KafkaSource ignores the method (never been called), and it seems the setUnbounded method only accepts offset or time.