Hi All,

I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I 
have understood that the gap is computed dynamically by a function on each 
element. What I should be able to obtain is a Flink application that can 
automatically manage the windows based on the frequency of the data. (if I have 
understood correctly)

But I'm wondering if there is any parameter to adjust the computation to do 
more windows or less windows considering the same data.

I have my event that provide "millis" of which I would like to pass to the 
function but I don't understand how, for the moment I'm trying with the code 
below but no luck.. Can you please give me some help? Thanks!


        FlinkKafkaConsumer<Event> kafkaData =
                new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
        WatermarkStrategy<Event> wmStrategy =
                WatermarkStrategy
                        .<Event>forMonotonousTimestamps()
                        .withIdleness(Duration.ofMinutes(1))
                        .withTimestampAssigner((event, timestamp) -> { return 
event.get_Time();

                        });

        DataStream<Event> stream = env.addSource(
                kafkaData.assignTimestampsAndWatermarks(wmStrategy));


        DataStream<Event> Data = stream
                .keyBy((Event ride) -> ride.CorrID)
                .window(EventTimeSessionWindows.withDynamicGap((event)->{
                    return event.get_Time();}));



Where from the load of the message which i receive from Kafka i convert the 
date time in millis.

 public long get_Time() {
        long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
        this.millis = tn;
        return millis;
    }
    public void set_a_t_rt(String a_t_rt) {
        this.a_t_rt = a_t_rt;
    }




Reply via email to