Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to work. I have reproduced the exact same code in Java and it works!
Is this a pyflink bug? If so - how can I report it? If not - what can I try to do? Flink: 1.18.0 image: flink:1.18.0-scala_2.12-java11 Code to reproduce. I expect this code to print: <current_datetime, None> all the time. But it prints <current_datetime> and state value ```python import time from datetime import datetime from pyflink.common import Time, Types from pyflink.datastream import KeyedProcessFunction, RuntimeContext, StreamExecutionEnvironment, TimeCharacteristic from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor class Processor(KeyedProcessFunction): def open(self, runtime_context: RuntimeContext): state_descriptor = ValueStateDescriptor( name="my_state", value_type_info=Types.STRING(), ) state_descriptor.enable_time_to_live( ttl_config=StateTtlConfig.new_builder(Time.seconds(1)) .cleanup_incrementally(cleanup_size=10, run_cleanup_for_every_record=True) .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite) .set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build() ) self.state = runtime_context.get_state(state_descriptor) def process_element(self, value: int, ctx: KeyedProcessFunction.Context): current_state = self.state.value() print(datetime.now(), current_state) if current_state is None: self.state.update(str(datetime.now())) time.sleep(1.5) if __name__ == "__main__": # - Init environment environment = StreamExecutionEnvironment.get_execution_environment().set_parallelism(1) # - Setup pipeline ( environment.set_parallelism(1) .from_collection( collection=list(range(10)), ) .key_by(lambda value: 0) .process(Processor()) ) # - Execute pipeline environment.execute("ttl_test") ``` ```java import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.Histogram; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import java.io.IOException; import java.time.LocalDateTime; public class GameHistoryProcessor extends KeyedProcessFunction<Integer, String, String> { private transient ValueState<String> state; @Override public void open(Configuration parameters) { var stateTtlConfig = StateTtlConfig .newBuilder(Time.seconds(1)) // .cleanupFullSnapshot() .cleanupIncrementally(10, true) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); var stateDescriptor = new ValueStateDescriptor<>("state", String.class); stateDescriptor.enableTimeToLive(stateTtlConfig); state = getRuntimeContext().getState(stateDescriptor); } @Override public void processElement(String event, Context context, Collector<String> collector) throws IOException, InterruptedException { var state = state.value(); System.out.println("State: " + state); if (state == null) { state = LocalDateTime.now().toString(); state.update(state); } Thread.sleep(1500); } }```