Github user xccui commented on the issue: https://github.com/apache/flink/pull/5094 Hi @hequn8128, let me try to explain this. 1. In current implementation, the join process just relies on the cached rows instead of the watermarks. Specifically, when receiving a record, the join function will only check whether there exist qualified rows of the opposite cache in spite of the lateness. Thus if the cache ***is not cleaned up in time***, the outdated results will be emitted. 2. Strictly speaking, the value for holding back watermarks should be dynamically reported by the join function in runtime. The current implementation temporarily uses a static value (`MaxOutputDelay`) for that. In other words, the holding back value should be decided by the cached rows, rather than, the cache size should be decided by `MaxOutputDelay`. Hope that helps. Best, Xingcan
---