Thanks, I'll check it out. I got a bit confused with the Ingesting time equals to null in tests but all is ok now , I appreciate that
On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kklou...@gmail.com> wrote: > Hi Avi, > > Just to verify your ITCase, I wrote the following dummy example and it > seems to be "working" (ie. I can see non null timestamps and timers firing). > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); > env.setParallelism(1); > > env > .addSource(new LongSource()) > .keyBy(elmnt -> elmnt) > .process(new KeyedProcessFunction<Long, Long, Long>() { > > @Override > public void processElement(Long value, Context ctx, Collector<Long> > out) throws Exception { > > > long timestamp = ctx.timestamp(); > long timerTimestamp = timestamp + > Time.seconds(10).toMilliseconds(); > > System.out.println(ctx.timestamp() + " " + timerTimestamp); > > ctx.timerService().registerProcessingTimeTimer(timerTimestamp); > } > > @Override > public void onTimer(long timestamp, OnTimerContext ctx, > Collector<Long> out) throws Exception { > System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain()); > } > }).print(); > env.execute(); > > The source is: > > private static final class LongSource implements SourceFunction<Long> { > > private volatile boolean running = true; > > private long element = 0L; > > @Override > public void run(SourceContext<java.lang.Long> ctx) throws Exception { > while (running) { > ctx.collect(element++ % 10); > Thread.sleep(10L); > } > } > > @Override > public void cancel() { > > } > } > > > Could you provide more details on how your usecase differs from the above > dummy example so that we can pin down the problem? > > As a side-note, Ingestion time is essentially event time, with the only > difference that the timestamp assigner in the beginning gives each element > the timestamp System.currentTimeMillis. So in this case, maybe you could > also consider setting event time timers but keep in mind then your > Watermark emission interval. > > In addition, if you want to simply check processing time processing of you > operator (not the whole pipeline), then you could make use of the > OneInputStreamTaskTestHarness or its keyed variant. This allows you to > provide your own processing time provider thus allow you to > deterministically > test processing time behaviour. > > Cheers, > Kostas > > > > On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <avi.l...@bluevoyant.com> wrote: > >> Any idea what should I do to overcome this? >> >> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Hi Andrey, >>> I am testing a Filter operator that receives a key from the stream and >>> checks if it is a new one or not. if it is new it keeps it in state and >>> fire a timer all that is done using the ProcessFunction. >>> The testing is using some CollectSink as described here >>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> >>> and >>> the source is implementation of the SourceFunction that accepts a >>> collection of values and adds it to ctx.collect . >>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in >>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + >>> x) the timer is fired immediately. >>> >>> >>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com> >>> wrote: >>> >>>> Hi Avi, >>>> >>>> what is the structure of your unit test? do you create some source and >>>> then apply function or you test only ProcessFunction methods in isolation? >>>> does ctx.timestamp() return zero or which value? >>>> >>>> Best, >>>> Andrey >>>> >>>> >>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Hi Andrey , >>>>> I'm using IngestionTime >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>>>> >>>>> This is my timer in the processElement: >>>>> val nextTime: Long = ctx.timestamp() + daysInMilliseconds(14) >>>>> ctx.timerService.registerProcessingTimeTimer(nextTim) >>>>> >>>>> The problem is how do I use it in my unit tests ? since there is no >>>>> IngestionTime and timers are fired immediately so the timers actions (such >>>>> as state cleanup) are fired before time and causing the tests to fail . >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Avi, >>>>>> >>>>>> do you use processing time timer >>>>>> (timerService().registerProcessingTimeTimer)? >>>>>> why do you need ingestion time? do you >>>>>> set TimeCharacteristic.IngestionTime? >>>>>> >>>>>> Best, >>>>>> Andrey >>>>>> >>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> Our stream is not based on time sequence and we do not use time >>>>>>> based operations. we do want to clean the state after x days hence we >>>>>>> fire >>>>>>> timer event. My problem is that our unit test fires the event >>>>>>> immediately >>>>>>> (there is no ingestion time) how can I inject ingestion time ? >>>>>>> >>>>>>> Cheers >>>>>>> Avi >>>>>>> >>>>>>>