Yun Tang created FLINK-24105: -------------------------------- Summary: state TTL might not take effect for pyflink Key: FLINK-24105 URL: https://issues.apache.org/jira/browse/FLINK-24105 Project: Flink Issue Type: Bug Components: API / Python, Runtime / State Backends Reporter: Yun Tang Fix For: 1.14.0
Since pyflink has its own data cache on python side, it might still read the data from python side even TTL has expired. Scripts below could reproduce this: {code:python} from pyflink.common.time import Time from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, ListStateDescriptor, MapStateDescriptor from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, RuntimeContext, KeyedProcessFunction, \ EmbeddedRocksDBStateBackend import time from datetime import datetime def test_keyed_process_function_with_state(): env = StreamExecutionEnvironment.get_execution_environment() env.get_config().set_auto_watermark_interval(2000) env.set_stream_time_characteristic(TimeCharacteristic.EventTime) env.set_state_backend(EmbeddedRocksDBStateBackend()) data_stream = env.from_collection([(1, 'hi', '1603708211000'), (3, 'hi', '1603708226000'), (10, 'hi', '1603708226000'), (6, 'hello', '1603708293000')], type_info=Types.ROW([Types.INT(), Types.STRING(), Types.STRING()])) class MyProcessFunction(KeyedProcessFunction): def __init__(self): self.value_state = None self.list_state = None self.map_state = None def open(self, runtime_context: RuntimeContext): state_ttl_config = StateTtlConfig \ .new_builder(Time.seconds(1)) \ .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \ .never_return_expired() \ .build() value_state_descriptor = ValueStateDescriptor('value_state', Types.INT()) value_state_descriptor.enable_time_to_live(state_ttl_config) self.value_state = runtime_context.get_state(value_state_descriptor) list_state_descriptor = ListStateDescriptor('list_state', Types.INT()) list_state_descriptor.enable_time_to_live(state_ttl_config) self.list_state = runtime_context.get_list_state(list_state_descriptor) map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), Types.STRING()) map_state_descriptor.enable_time_to_live(state_ttl_config) self.map_state = runtime_context.get_map_state(map_state_descriptor) def process_element(self, value, ctx): time.sleep(20) current_value = self.value_state.value() self.value_state.update(value[0]) current_list = [_ for _ in self.list_state.get()] self.list_state.add(value[0]) map_entries_string = [] for k, v in self.map_state.items(): map_entries_string.append(str(k) + ': ' + str(v)) map_entries_string = '{' + ', '.join(map_entries_string) + '}' self.map_state.put(value[0], value[1]) current_key = ctx.get_current_key() yield "time: {}, current key: {}, current value state: {}, current list state: {}, " \ "current map state: {}, current value: {}".format(str(datetime.now().time()), str(current_key), str(current_value), str(current_list), map_entries_string, str(value)) def on_timer(self, timestamp, ctx): pass data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \ .process(MyProcessFunction(), output_type=Types.STRING()) \ .print() env.execute('test time stamp assigner with keyed process function') if __name__ == '__main__': test_keyed_process_function_with_state() {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)