Hi Avi, Good to hear that!
Cheers, Kostas On Mon, Mar 25, 2019 at 3:37 PM Avi Levi <avi.l...@bluevoyant.com> wrote: > Thanks, I'll check it out. I got a bit confused with the Ingesting time > equals to null in tests but all is ok now , I appreciate that > > On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kklou...@gmail.com> wrote: > >> Hi Avi, >> >> Just to verify your ITCase, I wrote the following dummy example and it >> seems to be "working" (ie. I can see non null timestamps and timers firing). >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >> env.setParallelism(1); >> >> env >> .addSource(new LongSource()) >> .keyBy(elmnt -> elmnt) >> .process(new KeyedProcessFunction<Long, Long, Long>() { >> >> @Override >> public void processElement(Long value, Context ctx, Collector<Long> >> out) throws Exception { >> >> >> long timestamp = ctx.timestamp(); >> long timerTimestamp = timestamp + >> Time.seconds(10).toMilliseconds(); >> >> System.out.println(ctx.timestamp() + " " + timerTimestamp); >> >> ctx.timerService().registerProcessingTimeTimer(timerTimestamp); >> } >> >> @Override >> public void onTimer(long timestamp, OnTimerContext ctx, >> Collector<Long> out) throws Exception { >> System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain()); >> } >> }).print(); >> env.execute(); >> >> The source is: >> >> private static final class LongSource implements SourceFunction<Long> { >> >> private volatile boolean running = true; >> >> private long element = 0L; >> >> @Override >> public void run(SourceContext<java.lang.Long> ctx) throws Exception { >> while (running) { >> ctx.collect(element++ % 10); >> Thread.sleep(10L); >> } >> } >> >> @Override >> public void cancel() { >> >> } >> } >> >> >> Could you provide more details on how your usecase differs from the above >> dummy example so that we can pin down the problem? >> >> As a side-note, Ingestion time is essentially event time, with the only >> difference that the timestamp assigner in the beginning gives each element >> the timestamp System.currentTimeMillis. So in this case, maybe you could >> also consider setting event time timers but keep in mind then your >> Watermark emission interval. >> >> In addition, if you want to simply check processing time processing of >> you operator (not the whole pipeline), then you could make use of the >> OneInputStreamTaskTestHarness or its keyed variant. This allows you to >> provide your own processing time provider thus allow you to >> deterministically >> test processing time behaviour. >> >> Cheers, >> Kostas >> >> >> >> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Any idea what should I do to overcome this? >>> >>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com> >>> wrote: >>> >>>> Hi Andrey, >>>> I am testing a Filter operator that receives a key from the stream and >>>> checks if it is a new one or not. if it is new it keeps it in state and >>>> fire a timer all that is done using the ProcessFunction. >>>> The testing is using some CollectSink as described here >>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> >>>> and >>>> the source is implementation of the SourceFunction that accepts a >>>> collection of values and adds it to ctx.collect . >>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in >>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + >>>> x) the timer is fired immediately. >>>> >>>> >>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com> >>>> wrote: >>>> >>>>> Hi Avi, >>>>> >>>>> what is the structure of your unit test? do you create some source and >>>>> then apply function or you test only ProcessFunction methods in isolation? >>>>> does ctx.timestamp() return zero or which value? >>>>> >>>>> Best, >>>>> Andrey >>>>> >>>>> >>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com> >>>>> wrote: >>>>> >>>>>> Hi Andrey , >>>>>> I'm using IngestionTime >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>>>>> >>>>>> This is my timer in the processElement: >>>>>> val nextTime: Long = ctx.timestamp() + daysInMilliseconds(14) >>>>>> ctx.timerService.registerProcessingTimeTimer(nextTim) >>>>>> >>>>>> The problem is how do I use it in my unit tests ? since there is no >>>>>> IngestionTime and timers are fired immediately so the timers actions >>>>>> (such >>>>>> as state cleanup) are fired before time and causing the tests to fail . >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Avi, >>>>>>> >>>>>>> do you use processing time timer >>>>>>> (timerService().registerProcessingTimeTimer)? >>>>>>> why do you need ingestion time? do you >>>>>>> set TimeCharacteristic.IngestionTime? >>>>>>> >>>>>>> Best, >>>>>>> Andrey >>>>>>> >>>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> Our stream is not based on time sequence and we do not use time >>>>>>>> based operations. we do want to clean the state after x days hence we >>>>>>>> fire >>>>>>>> timer event. My problem is that our unit test fires the event >>>>>>>> immediately >>>>>>>> (there is no ingestion time) how can I inject ingestion time ? >>>>>>>> >>>>>>>> Cheers >>>>>>>> Avi >>>>>>>> >>>>>>>>