Any idea what should I do to overcome this? On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
> Hi Andrey, > I am testing a Filter operator that receives a key from the stream and > checks if it is a new one or not. if it is new it keeps it in state and > fire a timer all that is done using the ProcessFunction. > The testing is using some CollectSink as described here > <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing> > and > the source is implementation of the SourceFunction that accepts a > collection of values and adds it to ctx.collect . > The ctx.timestamp() is null, BUT even if I set the timer to sometime in > the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp + > x) the timer is fired immediately. > > > On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com> > wrote: > >> Hi Avi, >> >> what is the structure of your unit test? do you create some source and >> then apply function or you test only ProcessFunction methods in isolation? >> does ctx.timestamp() return zero or which value? >> >> Best, >> Andrey >> >> >> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com> wrote: >> >>> Hi Andrey , >>> I'm using IngestionTime >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime) >>> >>> This is my timer in the processElement: >>> val nextTime: Long = ctx.timestamp() + daysInMilliseconds(14) >>> ctx.timerService.registerProcessingTimeTimer(nextTim) >>> >>> The problem is how do I use it in my unit tests ? since there is no >>> IngestionTime and timers are fired immediately so the timers actions (such >>> as state cleanup) are fired before time and causing the tests to fail . >>> >>> >>> >>> >>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com> >>> wrote: >>> >>>> Hi Avi, >>>> >>>> do you use processing time timer >>>> (timerService().registerProcessingTimeTimer)? >>>> why do you need ingestion time? do you >>>> set TimeCharacteristic.IngestionTime? >>>> >>>> Best, >>>> Andrey >>>> >>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com> >>>> wrote: >>>> >>>>> Hi, >>>>> Our stream is not based on time sequence and we do not use time based >>>>> operations. we do want to clean the state after x days hence we fire timer >>>>> event. My problem is that our unit test fires the event immediately (there >>>>> is no ingestion time) how can I inject ingestion time ? >>>>> >>>>> Cheers >>>>> Avi >>>>> >>>>>