Yun Tang created FLINK-24105:
--------------------------------

             Summary: state TTL might not take effect for pyflink
                 Key: FLINK-24105
                 URL: https://issues.apache.org/jira/browse/FLINK-24105
             Project: Flink
          Issue Type: Bug
          Components: API / Python, Runtime / State Backends
            Reporter: Yun Tang
             Fix For: 1.14.0


Since pyflink has its own data cache on python side, it might still read the 
data from python side even TTL has expired.

Scripts below could reproduce this:
{code:python}
from pyflink.common.time import Time
from pyflink.datastream.state import ValueStateDescriptor, StateTtlConfig, 
ListStateDescriptor, MapStateDescriptor
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, 
RuntimeContext, KeyedProcessFunction, \
    EmbeddedRocksDBStateBackend
import time
from datetime import datetime

def test_keyed_process_function_with_state():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.get_config().set_auto_watermark_interval(2000)
    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    env.set_state_backend(EmbeddedRocksDBStateBackend())
    data_stream = env.from_collection([(1, 'hi', '1603708211000'),
                                            (3, 'hi', '1603708226000'),
                                            (10, 'hi', '1603708226000'),
                                            (6, 'hello', '1603708293000')],
                                           type_info=Types.ROW([Types.INT(), 
Types.STRING(),
                                                                
Types.STRING()]))


    class MyProcessFunction(KeyedProcessFunction):

        def __init__(self):
            self.value_state = None
            self.list_state = None
            self.map_state = None

        def open(self, runtime_context: RuntimeContext):
            state_ttl_config = StateTtlConfig \
                .new_builder(Time.seconds(1)) \
                .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) \
                .never_return_expired() \
                .build()
            value_state_descriptor = ValueStateDescriptor('value_state', 
Types.INT())
            value_state_descriptor.enable_time_to_live(state_ttl_config)
            self.value_state = runtime_context.get_state(value_state_descriptor)
            list_state_descriptor = ListStateDescriptor('list_state', 
Types.INT())
            list_state_descriptor.enable_time_to_live(state_ttl_config)
            self.list_state = 
runtime_context.get_list_state(list_state_descriptor)
            map_state_descriptor = MapStateDescriptor('map_state', Types.INT(), 
Types.STRING())
            map_state_descriptor.enable_time_to_live(state_ttl_config)
            self.map_state = runtime_context.get_map_state(map_state_descriptor)

        def process_element(self, value, ctx):
            time.sleep(20)
            current_value = self.value_state.value()
            self.value_state.update(value[0])
            current_list = [_ for _ in self.list_state.get()]
            self.list_state.add(value[0])
            map_entries_string = []
            for k, v in self.map_state.items():
                map_entries_string.append(str(k) + ': ' + str(v))
            map_entries_string = '{' + ', '.join(map_entries_string) + '}'
            self.map_state.put(value[0], value[1])
            current_key = ctx.get_current_key()
            yield "time: {}, current key: {}, current value state: {}, current 
list state: {}, " \
                  "current map state: {}, current value: 
{}".format(str(datetime.now().time()),
                                                                    
str(current_key),
                                                                    
str(current_value),
                                                                    
str(current_list),
                                                                    
map_entries_string,
                                                                    str(value))

        def on_timer(self, timestamp, ctx):
            pass


    data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
        .process(MyProcessFunction(), output_type=Types.STRING()) \
        .print()
    env.execute('test time stamp assigner with keyed process function')

if __name__ == '__main__':
    test_keyed_process_function_with_state()
{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to