Hi,

I'm not sure that what you want is possible. You say you want more windows when there are more events for a given time frame? That is when the events are more dense in time?

Also, using the event timestamp as the gap doesn't look correct. The gap basically specifies the timeout for a session (and I now realize that maybe "gap" is not a good word for that). So if your timeout increases as time goes on your successive sessions will just get bigger and bigger.

Best,
Aljoscha

On 12.11.20 15:56, Simone Cavallarin wrote:
Hi All,

I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. I 
have understood that the gap is computed dynamically by a function on each 
element. What I should be able to obtain is a Flink application that can 
automatically manage the windows based on the frequency of the data. (if I have 
understood correctly)

But I'm wondering if there is any parameter to adjust the computation to do 
more windows or less windows considering the same data.

I have my event that provide "millis" of which I would like to pass to the 
function but I don't understand how, for the moment I'm trying with the code below but no 
luck.. Can you please give me some help? Thanks!


         FlinkKafkaConsumer<Event> kafkaData =
                 new FlinkKafkaConsumer("CorID_1", new 
EventDeserializationSchema(), p);
         WatermarkStrategy<Event> wmStrategy =
                 WatermarkStrategy
                         .<Event>forMonotonousTimestamps()
                         .withIdleness(Duration.ofMinutes(1))
                         .withTimestampAssigner((event, timestamp) -> { return 
event.get_Time();

                         });

         DataStream<Event> stream = env.addSource(
                 kafkaData.assignTimestampsAndWatermarks(wmStrategy));


         DataStream<Event> Data = stream
                 .keyBy((Event ride) -> ride.CorrID)
                 .window(EventTimeSessionWindows.withDynamicGap((event)->{
                     return event.get_Time();}));



Where from the load of the message which i receive from Kafka i convert the 
date time in millis.

  public long get_Time() {
         long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli();
         this.millis = tn;
         return millis;
     }
     public void set_a_t_rt(String a_t_rt) {
         this.a_t_rt = a_t_rt;
     }






Reply via email to