Hello,

I am an user of pyflink. The pseudo code of my pipeline is as follows:

```
if not map_state.contains(data[“key”]):
        processing(data)
map_state.put(data[“key”], 0) 
```
However, it raises a KeyError:
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 929, in contains
    return self.get_internal_state().contains(key)
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 813, in contains
    if self.get(map_key) is None:
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 775, in get
    self._state_key, map_key, self._map_key_encoder, self._map_value_decoder)
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 420, in blocking_get
    cached_map_state.put(map_key, (exists, value))
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 321, in put
    super(CachedMapState, self).put(key, exists_and_value)
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 68, in put
    self._on_evict(name, value)
  File 
"/opt/venv/lib64/python3.7/site-packages/pyflink/fn_execution/state_impl.py", 
line 306, in on_evict
    self._cached_keys.remove(key)
KeyError: ‘xxxxxx'

The environment I am working on is Python-3.7.15, openjdk-11.0.16, apache-flink 
1.16.0.

I guess the reason of this error is the checking of the value is missing at 
https://github.com/apache/flink/blob/release-1.16.0/flink-python/pyflink/fn_execution/state_impl.py#L305
Because at line 
https://github.com/apache/flink/blob/release-1.16.0/flink-python/pyflink/fn_execution/state_impl.py#L317,
  the key is added to this set based on the first entry of the value.
But there is no checking of the value when `_cached_keys.remove` is called at 
line 305. So `_cached_keys.remove` is called with keys do not exist in this set 
and it raises KeyError.

I think to fix this issue, you can add one line before line 305:
```
if value[0]:
        self._cached_keys.remove(key)
```

Could you please verify if my guess is correct?

Thank you!

Ru Zhang

Reply via email to