Thanks Eron I have tried to read an EOF symbol and invoke FlinkKafkaConsumer's cancel method, it doesn't work. But I invoke the method in a FlatMap operator which is next to source operator, I guess that is the problem. I will try your answer, thanks for your suggestion.
-- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/