Hi hequan Weird behaviour when i m calling ctx.timeservice() function is getting exited even not throwing error
On Tuesday, January 8, 2019, Hequn Cheng <chenghe...@gmail.com> wrote: > Hi puneet, > > Could you print `parseLong + 5000` and > `ctx.timerService().currentProcessingTime()` > out and check the value? > I know it is a streaming program. What I mean is the timer you have > registered is not within the interval of your job, so the timer has not > been triggered. For example, parseLong + 5000 = 5000 or parseLong + 5000 = > 100000000000(very big). > > Best, Hequn > > > On Tue, Jan 8, 2019 at 1:38 AM Puneet Kinra <puneet.kinra@customercentria. > com> wrote: > >> I checked the same the function is getting exited when i am calling >> ctx.getTimeservice () function. >> >> On Mon, Jan 7, 2019 at 10:27 PM Timo Walther <twal...@apache.org> wrote: >> >>> Hi Puneet, >>> >>> maybe you can show or explain us a bit more about your pipeline. From >>> what I see your ProcessFunction looks correct. Are you sure the registering >>> takes place? >>> >>> Regards, >>> Timo >>> >>> Am 07.01.19 um 14:15 schrieb Puneet Kinra: >>> >>> Hi Hequn >>> >>> Its a streaming job . >>> >>> On Mon, Jan 7, 2019 at 5:51 PM Hequn Cheng <chenghe...@gmail.com> wrote: >>> >>>> Hi Puneet, >>>> >>>> The value of the registered timer should within startTime and endTime >>>> of your job. For example, job starts at processing time t1 and stops at >>>> processing time t2. You have to make sure t1< `parseLong + 5000` < t2. >>>> >>>> Best, Hequn >>>> >>>> On Mon, Jan 7, 2019 at 5:50 PM Puneet Kinra < >>>> puneet.ki...@customercentria.com> wrote: >>>> >>>>> Hi All >>>>> >>>>> Facing some issue with context to onTimer method in processfunction >>>>> >>>>> class TimerTest extends ProcessFunction<Tuple2<String,String>,String>{ >>>>> >>>>> /** >>>>> * >>>>> */ >>>>> private static final long serialVersionUID = 1L; >>>>> >>>>> @Override >>>>> public void processElement(Tuple2<String, String> arg0, >>>>> ProcessFunction<Tuple2<String, String>, String>.Context ctx, >>>>> Collector<String> arg2) throws Exception { >>>>> // TODO Auto-generated method stub >>>>> long parseLong = Long.parseLong(arg0.f1); >>>>> TimerService timerService = ctx.timerService(); >>>>> ctx.timerService().registerProcessingTimeTimer(parseLong + 5000); >>>>> } >>>>> >>>>> @Override >>>>> public void onTimer(long timestamp, ProcessFunction<Tuple2<String, >>>>> String>, String>.OnTimerContext ctx, >>>>> Collector<String> out) throws Exception { >>>>> // TODO Auto-generated method stub >>>>> super.onTimer(timestamp, ctx, out); >>>>> System.out.println("Executing timmer"+timestamp); >>>>> out.collect("Timer Testing.."); >>>>> } >>>>> } >>>>> >>>>> -- >>>>> *Cheers * >>>>> >>>>> *Puneet Kinra* >>>>> >>>>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>>>> <puneet.ki...@customercentria.com>* >>>>> >>>>> *e-mail :puneet.ki...@customercentria.com >>>>> <puneet.ki...@customercentria.com>* >>>>> >>>>> >>>>> >>> >>> -- >>> *Cheers * >>> >>> *Puneet Kinra* >>> >>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >>> <puneet.ki...@customercentria.com>* >>> >>> *e-mail :puneet.ki...@customercentria.com >>> <puneet.ki...@customercentria.com>* >>> >>> >>> >>> >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> *e-mail :puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> >> -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*