Sure, my pleasure!

Aljoscha

On 19.11.20 16:12, Simone Cavallarin wrote:
Many thanks for the Help!!

Simone

________________________________
From: Aljoscha Krettek <aljos...@apache.org>
Sent: 19 November 2020 11:46
To: user@flink.apache.org <user@flink.apache.org>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

On 17.11.20 17:37, Simone Cavallarin wrote:
Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream<Event> source = <Kafka Source>

Operation in front of the window that keeps track of session gaps

DataStream<Tuple2<MyMessageType, Long>> enriched = source
     .keyBy(<key extractor>)
     .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2<MyMessageType, Long> map(MyMessageType input) {
     ValueState<MyState> valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
     state.update(input);
     long suggestedGap = state.getSuggestedGap();
     valueState.update(state);
     return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
     .keyBy(new MyKeySelector())
     .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?

This would just be class that extends
SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
from the extract() method.

One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?

That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha



Reply via email to