Many thanks for the Help!!

Simone

________________________________
From: Aljoscha Krettek <aljos...@apache.org>
Sent: 19 November 2020 11:46
To: user@flink.apache.org <user@flink.apache.org>
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

On 17.11.20 17:37, Simone Cavallarin wrote:
> Hi,
>
> I have been working on the suggestion that you gave me, thanks! The first 
> part is to add to the message the gap. 1)I receive the event, 2)I take that 
> event and I map it using  StatefulsessionCalculator, that is where I put 
> together "The message", and "long" that is my gap in millis.
>
> DataStream<Event> source = <Kafka Source>
>
> Operation in front of the window that keeps track of session gaps
>
> DataStream<Tuple2<MyMessageType, Long>> enriched = source
>     .keyBy(<key extractor>)
>     .map(new StatefulSessionCalculator()); // or process()
>
> This is my StatefulSessionCalculator():
>
> Tuple2<MyMessageType, Long> map(MyMessageType input) {
>     ValueState<MyState> valueState = getState(myModelStateDescriptor);
> MyState state = valueState.value()
>     state.update(input);
>     long suggestedGap = state.getSuggestedGap();
>     valueState.update(state);
>     return Tuple2.of(input, suggestedGap);
> }
>
> If the "gap" calculated is "1234".
> The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?

That looks correct, yes.

> The second step is to use the gap calculated through  
> DynamicWindowGapExtractor().
>
> DataStream<...> result = enriched
>     .keyBy(new MyKeySelector())
>     .window(EventTimeSessionWindows.withDynamicGap(new 
> DynamicWindowGapExtractor()))
>
>
> The DynamicWindowGapExtractor() extract the gap from the message and feed it 
> back to Flink.
> Could you please give me an example also for this one?

This would just be class that extends
SessionWindowTimeGapExtractor<Tuple2<MyEvent, Long>> and returns the gap
from the extract() method.

> One thing that I don't understand is that after enriching the message my 
> event that contain a POJO is nested inside tuple. How can I access it?

You would just read the first field of the tuple, i.e. tuple.f0.


> The last point when you said: "I think, however, that it might be easier at 
> this point to just use a stateful ProcessFunction", you meant a completely 
> different approach, would be better?

That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha

Reply via email to