Hello, I am an user of pyflink. The pseudo code of my pipeline is as follows:
``` if not map_state.contains(data[“key”]): processing(data) map_state.put(data[“key”], 0) ``` However, it raises a KeyError: File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 929, in contains return self.get_internal_state().contains(key) File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 813, in contains if self.get(map_key) is None: File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 775, in get self._state_key, map_key, self._map_key_encoder, self._map_value_decoder) File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 420, in blocking_get cached_map_state.put(map_key, (exists, value)) File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 321, in put super(CachedMapState, self).put(key, exists_and_value) File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 68, in put self._on_evict(name, value) File "/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", line 306, in on_evict self._cached_keys.remove(key) KeyError: ‘xxxxxx' The environment I am working on is Python-3.7.15, openjdk-11.0.16, apache-flink 1.16.0. I guess the reason of this error is the checking of the value is missing at https://github.com/apache/flink/blob/release-1.16.0/flink-python/pyflink/fn_execution/state_impl.py#L305 Because at line https://github.com/apache/flink/blob/release-1.16.0/flink-python/pyflink/fn_execution/state_impl.py#L317, the key is added to this set based on the first entry of the value. But there is no checking of the value when `_cached_keys.remove` is called at line 305. So `_cached_keys.remove` is called with keys do not exist in this set and it raises KeyError. I think to fix this issue, you can add one line before line 305: ``` if value[0]: self._cached_keys.remove(key) ``` Could you please verify if my guess is correct? Thank you! Ru Zhang