Good to hear that! Kostas
> On Apr 25, 2016, at 12:24 PM, Piyush Shrivastava <piyush...@yahoo.co.in> > wrote: > > Thanks a lot Kostas. This solved my problem. > > Thanks and Regards, > Piyush Shrivastava <mailto:piy...@webograffiti.com> > > http://webograffiti.com <http://webograffiti.com/> > > > On Monday, 25 April 2016 3:27 PM, Kostas Kloudas > <k.klou...@data-artisans.com> wrote: > > > Hi, > > Let me also add that you should also override the clear() method in order to > clear you state. > and delete the pending timers. > > Kostas > >> On Apr 25, 2016, at 11:52 AM, Kostas Kloudas <k.klou...@data-artisans.com >> <mailto:k.klou...@data-artisans.com>> wrote: >> > > Hi Piyush, > > In the onElement function, you register a timer every time you receive an > element. > > When the next watermark arrives, in the flag==false case, this will lead to > every element > adding a timer for its timestamp+60000ms. The same for flag==true case, with > 20000ms interval. > > What you can try is to set only once, at the first element the initial > trigger for 60 sec, and then > just set all the rest in the on the onEventTime with 20 sec. > > To have a look at a custom trigger you can look at here: > https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java > > <https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/beam_comparison/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java> > > I hope this helped. > Let me know if you need any help. > > Kostas > >> On Apr 25, 2016, at 11:22 AM, Piyush Shrivastava <piyush...@yahoo.co.in >> <mailto:piyush...@yahoo.co.in>> wrote: >> >> Hi all, >> I want to implement a custom Trigger which fired a GlobalWindow in 1 minute >> for the first time and every 20 seconds after that. >> I believe I cannot get this logic right in the implementation of my custom >> Trigger. Please help me with this. >> >> Here is the code of my custom Trigger: >> >> public class TradeTrigger<W extends Window> extends Trigger<Object, W> { >> >> /** >> * >> */ >> private static final long serialVersionUID = 1L; >> >> private TradeTrigger() { >> } >> >> @Override >> public TriggerResult onElement( >> Object element, >> long timestamp, >> W window, >> >> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) >> throws Exception { >> >> ctx.registerEventTimeTimer(timestamp); >> return TriggerResult.CONTINUE; >> >> } >> >> @Override >> public TriggerResult onEventTime( >> long timestamp, >> W window, >> >> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext ctx) >> throws Exception { >> >> ValueState<Boolean> state = ctx.getPartitionedState(new >> ValueStateDescriptor<Boolean>("flag", Boolean.TYPE, false)); >> >> if(state.value()==false){ >> ctx.registerEventTimeTimer(timestamp+60000); >> state.update(true); >> return TriggerResult.FIRE; >> }else{ >> System.out.println(""+state.value()); >> ctx.registerEventTimeTimer(timestamp+20000); >> return TriggerResult.FIRE; >> } >> } >> >> @Override >> public TriggerResult onProcessingTime( >> long arg0, >> W arg1, >> >> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext >> arg2) >> throws Exception { >> // TODO Auto-generated method stub >> return TriggerResult.CONTINUE; >> } >> >> >> public static <W extends Window> TradeTrigger<W> of() { >> return new TradeTrigger<>(); >> } >> >> >> } >> >> Thanks and Regards, >> Piyush Shrivastava <mailto:piy...@webograffiti.com> >> >> http://webograffiti.com <http://webograffiti.com/> > > > >