Hi! I’ve created a basic pyflink pipeline with ttl and it does not seem to 
work. I have reproduced the exact same code in Java and it works!

Is this a pyflink bug? If so - how can I report it? If not - what can I try to 
do?

Flink: 1.18.0
image: flink:1.18.0-scala_2.12-java11

Code to reproduce. I expect this code to print: <current_datetime, None> all 
the time. But it prints <current_datetime> and state value

```python
import time

from datetime import datetime

from pyflink.common import Time, Types
from pyflink.datastream import KeyedProcessFunction, RuntimeContext, 
StreamExecutionEnvironment, TimeCharacteristic
from pyflink.datastream.state import StateTtlConfig, ValueStateDescriptor


class Processor(KeyedProcessFunction):
    def open(self, runtime_context: RuntimeContext):
        state_descriptor = ValueStateDescriptor(
            name="my_state",
            value_type_info=Types.STRING(),
        )

        state_descriptor.enable_time_to_live(
            ttl_config=StateTtlConfig.new_builder(Time.seconds(1))
            .cleanup_incrementally(cleanup_size=10, 
run_cleanup_for_every_record=True)
            .set_update_type(StateTtlConfig.UpdateType.OnCreateAndWrite)
            
.set_state_visibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
            .build()
        )

        self.state = runtime_context.get_state(state_descriptor)

    def process_element(self, value: int, ctx: KeyedProcessFunction.Context):
        current_state = self.state.value()

        print(datetime.now(), current_state)

        if current_state is None:
            self.state.update(str(datetime.now()))

        time.sleep(1.5)


if __name__ == "__main__":
    # - Init environment

    environment = 
StreamExecutionEnvironment.get_execution_environment().set_parallelism(1)

    # - Setup pipeline

    (
        environment.set_parallelism(1)
        .from_collection(
            collection=list(range(10)),
        )
        .key_by(lambda value: 0)
        .process(Processor())



    )

    # - Execute pipeline

    environment.execute("ttl_test")



```

```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.IOException;
import java.time.LocalDateTime;

public class GameHistoryProcessor extends KeyedProcessFunction<Integer, String, 
String> {


    private transient ValueState<String> state;


    @Override
    public void open(Configuration parameters) {
        var stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(1))
//                .cleanupFullSnapshot()
                .cleanupIncrementally(10, true)
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

        var stateDescriptor = new ValueStateDescriptor<>("state", String.class);
        stateDescriptor.enableTimeToLive(stateTtlConfig);

        state = getRuntimeContext().getState(stateDescriptor);

    }

    @Override
    public void processElement(String event, Context context, Collector<String> 
collector) throws IOException, InterruptedException {
        var state = state.value();
        System.out.println("State: " + state);

        if (state == null) {
            state = LocalDateTime.now().toString();
            state.update(state);
        }

        Thread.sleep(1500);
    }
}```

Reply via email to