Hi, Your observations are correct. It is expected that the result of `KafkaDeserializationSchema#isEndOfStream` triggers a single subtask to escape its fetch loop. Therefore, if a subtask is assigned multiple partitions, as soon as one record (regardless of which partition it came from) signals end of stream, then the subtask ends.
I'm afraid there is probably no good solution to this given the ill-defined semantics of the `isEndOfStream` method. All reasonable approaches that come to mind require some sort of external trigger to manually shut down the job. For now, I've filed a JIRA to propose a possible solution to the semantics of the method: https://issues.apache.org/jira/browse/FLINK-16112 -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/