Hi Jungtaek, I've faced a similar problem in the past; we need to calculate an aggregate upon receiving an end message from each user.
While you're trying to solve problem by defining a custom window assigner, I took a different approach to the problem by implementing a custom trigger. You can see my implementation in the following link (but I'm not quite sure if my implementation could solve your case): https://github.com/eastcirclek/flink-examples/blob/master/src/main/scala/com/github/eastcirclek/examples/customtrigger/trigger/TrackingEarlyResultEventTimeTrigger.scala Best, Dongwon p.s. FYI, I presented the background of the problem and the general idea last year at FlinkForward 2017 Berlin. Hope this presentation helps you: https://www.youtube.com/watch?v=wPQWFy5JENw On Sun, Aug 4, 2019 at 10:57 PM Jungtaek Lim <kabh...@gmail.com> wrote: > Hi Flink users, > > I've been spending time to learn and play with Flink DataStream API, not > an expert level but as a beginner. :) > > To play with custom window API, I just created a small example, session > window based on fixed time gap, but indicate the type of event which may > contain "end of session". I guess it's not unusual to see this kind of > things (like manual logout and login) though I don't have concrete real use > case. > > This is an implementation based on Flink DataStream API: > https://gist.github.com/HeartSaVioR/1d865b1a444af1ef7cae201bbdb196b0 > > Custom window works pretty well and I could leverage side output very > easily. One thing leading the code a bit longer was new definition of > TimeWindow (to deal with event type of "close session"). Even though I > tried to leverage TimeWindow via inheritance, the code goes a bit verbose > as I need to implement a new Serializer as well. > (Actually it required to implement a new Trigger as well, but took > workaround to leverage existing EventTimeTrigger.) > > Assuming this pattern is not unusual (it would be pretty OK if it's > unusual), could someone point out some points to improve or simplify the > code? That would be really nice if there's something I could contribute in > this case. > > Thanks, > Jungtaek Lim (HeartSaVioR) > > ps. This is an implementation based on Spark Structured Streaming (no > custom window API, so had to put everything in state function of > flatMapGroupsWithState): > https://gist.github.com/HeartSaVioR/133c3bdc163f1fd5332397c5cd4b8b29 >