Github user tragicjun commented on the issue: https://github.com/apache/flink/pull/6026 Thanks @suez1224 for reviewing. I've committed a new patch as per your suggestions. As for the unit test, the issue was exposed in my integration tests which requires a kafka setup, any advice on how it could be done in unit tests?
---