Hi All, I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I have understood that the gap is computed dynamically by a function on each element. What I should be able to obtain is a Flink application that can automatically manage the windows based on the frequency of the data. (if I have understood correctly)
But I'm wondering if there is any parameter to adjust the computation to do more windows or less windows considering the same data. I have my event that provide "millis" of which I would like to pass to the function but I don't understand how, for the moment I'm trying with the code below but no luck.. Can you please give me some help? Thanks! FlinkKafkaConsumer<Event> kafkaData = new FlinkKafkaConsumer("CorID_1", new EventDeserializationSchema(), p); WatermarkStrategy<Event> wmStrategy = WatermarkStrategy .<Event>forMonotonousTimestamps() .withIdleness(Duration.ofMinutes(1)) .withTimestampAssigner((event, timestamp) -> { return event.get_Time(); }); DataStream<Event> stream = env.addSource( kafkaData.assignTimestampsAndWatermarks(wmStrategy)); DataStream<Event> Data = stream .keyBy((Event ride) -> ride.CorrID) .window(EventTimeSessionWindows.withDynamicGap((event)->{ return event.get_Time();})); Where from the load of the message which i receive from Kafka i convert the date time in millis. public long get_Time() { long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli(); this.millis = tn; return millis; } public void set_a_t_rt(String a_t_rt) { this.a_t_rt = a_t_rt; }