Thanks Dongwon to provide feedback and share your approach!

I'm not sure it could be possible (not an expert), but if we could reset
intermediate result (aggregated) after processing "fire event", I guess it
would work as expected, as window would still expand even after "session
end", but it will provide same effect in point of "outputs". Nice approach!
I'll play with this approach too.

Thanks again,
Jungtaek Lim (HeartSaVioR)

On Mon, Aug 5, 2019 at 12:01 AM Dongwon Kim <eastcirc...@gmail.com> wrote:

> Hi Jungtaek,
>
> I've faced a similar problem in the past; we need to calculate an
> aggregate upon receiving an end message from each user.
>
> While you're trying to solve problem by defining a custom window assigner,
> I took a different approach to the problem by implementing a custom trigger.
>
> You can see my implementation in the following link (but I'm not quite
> sure if my implementation could solve your case):
>
> https://github.com/eastcirclek/flink-examples/blob/master/src/main/scala/com/github/eastcirclek/examples/customtrigger/trigger/TrackingEarlyResultEventTimeTrigger.scala
>
> Best,
> Dongwon
>
> p.s. FYI, I presented the background of the problem and the general idea
> last year at FlinkForward 2017 Berlin. Hope this presentation helps you:
> https://www.youtube.com/watch?v=wPQWFy5JENw
>
>
>
> On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <kabh...@gmail.com> wrote:
>
>> Hi Flink users,
>>
>> I've been spending time to learn and play with Flink DataStream API, not
>> an expert level but as a beginner. :)
>>
>> To play with custom window API, I just created a small example, session
>> window based on fixed time gap, but indicate the type of event which may
>> contain "end of session". I guess it's not unusual to see this kind of
>> things (like manual logout and login) though I don't have concrete real use
>> case.
>>
>> This is an implementation based on Flink DataStream API:
>> https://gist.github.com/HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0
>>
>> Custom window works pretty well and I could leverage side output very
>> easily. One thing leading the code a bit longer was new definition of
>> TimeWindow (to deal with event type of "close session"). Even though I
>> tried to leverage TimeWindow via inheritance, the code goes a bit verbose
>> as I need to implement a new Serializer as well.
>> (Actually it required to implement a new Trigger as well, but took
>> workaround to leverage existing EventTimeTrigger.)
>>
>> Assuming this pattern is not unusual (it would be pretty OK if it's
>> unusual), could someone point out some points to improve or simplify the
>> code? That would be really nice if there's something I could contribute in
>> this case.
>>
>> Thanks,
>> Jungtaek Lim (HeartSaVioR)
>>
>> ps. This is an implementation based on Spark Structured Streaming (no
>> custom window API, so had to put everything in state function of
>> flatMapGroupsWithState):
>> https://gist.github.com/HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29
>>
>

-- 
Name : Jungtaek Lim
Blog : http://medium.com/@heartsavior
Twitter : http://twitter.com/heartsavior
LinkedIn : http://www.linkedin.com/in/heartsavior

Reply via email to