In my specific case I can be more accurate about when the next time is when
it would make sense to check.
Then for all next events I only record a little bit of state about the
event stream (i.e. first and last event time).
I then only place a new timer in the timer handler and this way I can limit
the number of different timers that will be set dramatically.

Niels


On Fri, Nov 27, 2015 at 4:11 PM, Aljoscha Krettek <
aljoscha.kret...@gmail.com> wrote:

> Hi,
> yes, you are right in your analysis. Did you try running it with always
> setting the timer? Maybe it’s not the bottleneck of the computation. I
> would be very interested in seeing how this behaves since I only did tests
> with regular time windows, where the first if statement almost always
> directly returns, which is very cheap.
>
> Cheers,
> Aljoscha
> > On 27 Nov 2015, at 13:59, Niels Basjes <ni...@basjes.nl> wrote:
> >
> > Hi,
> >
> > Thanks for all this input.
> > I didn't know about the
> >       // a trigger can only have 1 timer so we remove the old trigger
> when setting the new one
> >
> > This insight is to me of major importance.
> > Let me explain:
> > I found in the WindowOperator this code below.
> >
> > @Override
> > public void registerEventTimeTimer(long time) {
> >    if (watermarkTimer == time) {
> >       // we already have set a trigger for that time
> >       return;
> >    }
> >    Set<Context> triggers = watermarkTimers.get(time);
> >    if (triggers == null) {
> >       triggers = new HashSet<>();
> >       watermarkTimers.put(time, triggers);
> >    }
> >    this.watermarkTimer = time;
> >    triggers.add(this);
> > }
> >
> > and
> >
> > if (time == watermarkTimer) {
> >    watermarkTimer = -1;
> >    Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time,
> window, this);
> >
> > Effectively the new value is stored; processed yet at the moment the
> trigger fires the call is not forwarded into the application.
> > So if I would do it as you show in your example I would have the same
> number of trigger entries in the watermarkTimers set as I have seen events.
> > My application will (in total) handle about 50K events/sec resulting in
> to thousands 'onEventTime' calls per second.
> >
> > So thank you. I now understand I have to be more careful with these
> timers!.
> >
> > Niels Basjes
> >
> >
> >
> > On Fri, Nov 27, 2015 at 11:28 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > Hi Niels,
> > do the records that arrive from Kafka already have the session ID or do
> you want to assign them inside your Flink job based on the idle timeout?
> >
> > For the rest of your problems you should be able to get by with what
> Flink provides:
> >
> > The triggering can be done using a custom Trigger that fires after we
> haven’t seen an element for 30 minutes.
> > public class TimeoutTrigger implements Trigger<Object, Window> {
> >    private static final long serialVersionUID = 1L;
> >
> >    @Override
> >    public TriggerResult onElement(Object element, long timestamp, Window
> window, TriggerContext ctx) throws Exception {
> >       // on every element it will set a timer for 30 seconds in the
> future
> >       // a trigger can only have 1 timer so we remove the old trigger
> when setting the new one
> >       ctx.registerProcessingTimeTimer(System.currentTimeMillis() +
> 30000); // this is 30 seconds but you can change it
> >       return TriggerResult.CONTINUE;
> >    }
> >
> >    @Override
> >    public TriggerResult onEventTime(long time, Window window,
> TriggerContext ctx) {
> >       return TriggerResult.CONTINUE;
> >    }
> >
> >    @Override
> >    public TriggerResult onProcessingTime(long time, Window window,
> TriggerContext ctx) throws Exception {
> >       return TriggerResult.FIRE_AND_PURGE;
> >    }
> >
> >    @Override
> >    public String toString() {
> >       return "TimeoutTrigger()";
> >    }
> > }
> >
> > you would use it like this:
> > stream.keyBy(…).window(…).trigger(new TimeoutTrigger())
> >
> > For writing to files you could use the RollingSink (
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#hadoop-filesystem).
> I think this does pretty much what you want. You can specify how large the
> files that it writes are, and it can also roll to new files on a specified
> time interval.
> >
> > Please let us know if you need more information.
> >
> > Cheers,
> > Aljoscha
> > > On 26 Nov 2015, at 22:13, Niels Basjes <ni...@basjes.nl> wrote:
> > >
> > > Hi,
> > >
> > > I'm trying to build something in Flink that relies heavily on the
> Windowing features.
> > >
> > > In essence what I want to build:
> > > I have clickstream data coming in via Kafka. Each record (click) has a
> sessionid and a timestamp.
> > > I want to create a window for each session and after 30 minutes idle I
> want all events for that session (visit) to be written to disk.
> > > This should result in the effect that a specific visit exists in
> exactly one file.
> > > Since HDFS does not like 'small files' I want to create a (set of)
> files every 15 minutes that contains several complete  visits.
> > > So I need to buffer the 'completed visits' and flush them to disk in
> 15 minute batches.
> > >
> > > What I think I need to get this is:
> > > 1) A map function that assigns the visit-id (i.e. new id after 30
> minutes idle)
> > > 2) A window per visit-id (close the window 30 minutes after the last
> click)
> > > 3) A window per 15 minutes that only contains windows of visits that
> are complete
> > >
> > > Today I've been trying to get this setup and I think I have some parts
> that are in the right direction.
> > >
> > > I have some questions and I'm hoping you guys can help me:
> > >
> > > 1) I have trouble understanding the way a windowed stream works
> "exactly".
> > > As a consequence I'm having a hard time verifying if my code does what
> I understand it should do.
> > > I guess what would really help me is a very simple example on how to
> unittest such a window.
> > >
> > > 2) Is what I describe above perhaps already been done before? If so;
> any pointers are really appreciated.
> > >
> > > 3) Am I working in the right direction for what I'm trying to achieve;
> or should I use a different API? a different approach?
> > >
> > > Thanks
> > >
> > > --
> > > Best regards / Met vriendelijke groeten,
> > >
> > > Niels Basjes
> > >
> > >
> >
> >
> >
> >
> > --
> > Best regards / Met vriendelijke groeten,
> >
> > Niels Basjes
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to