Many thanks for the Help!! Simone
________________________________ From: Aljoscha Krettek <aljos...@apache.org> Sent: 19 November 2020 11:46 To: user@flink.apache.org <user@flink.apache.org> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() On 17.11.20 17:37, Simone Cavallarin wrote: > Hi, > > I have been working on the suggestion that you gave me, thanks! The first > part is to add to the message the gap. 1)I receive the event, 2)I take that > event and I map it using StatefulsessionCalculator, that is where I put > together "The message", and "long" that is my gap in millis. > > DataStream<Event> source = <Kafka Source> > > Operation in front of the window that keeps track of session gaps > > DataStream<Tuple2<MyMessageType, Long>> enriched = source > .keyBy(<key extractor>) > .map(new StatefulSessionCalculator()); // or process() > > This is my StatefulSessionCalculator(): > > Tuple2<MyMessageType, Long> map(MyMessageType input) { > ValueState<MyState> valueState = getState(myModelStateDescriptor); > MyState state = valueState.value() > state.update(input); > long suggestedGap = state.getSuggestedGap(); > valueState.update(state); > return Tuple2.of(input, suggestedGap); > } > > If the "gap" calculated is "1234". > The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>? That looks correct, yes. > The second step is to use the gap calculated through > DynamicWindowGapExtractor(). > > DataStream<...> result = enriched > .keyBy(new MyKeySelector()) > .window(EventTimeSessionWindows.withDynamicGap(new > DynamicWindowGapExtractor())) > > > The DynamicWindowGapExtractor() extract the gap from the message and feed it > back to Flink. > Could you please give me an example also for this one? This would just be class that extends SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap from the extract() method. > One thing that I don't understand is that after enriching the message my > event that contain a POJO is nested inside tuple. How can I access it? You would just read the first field of the tuple, i.e. tuple.f0. > The last point when you said: "I think, however, that it might be easier at > this point to just use a stateful ProcessFunction", you meant a completely > different approach, would be better? That's what I meant yes. Because it seems to complicated to split the logic into the part that determines the dynamic gap and then another part that does the computation per session. It seems easier to just roll that into one operator that does everything. And with state and timers you should have enough flexibility. Best, Aljoscha