Thanks, I'll check it out. I got a bit confused with the Ingesting time
equals to null in tests but all is ok now , I appreciate that

On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kklou...@gmail.com> wrote:

> Hi Avi,
>
> Just to verify your ITCase, I wrote the following dummy example and it
> seems to be "working" (ie. I can see non null timestamps and timers firing).
>
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
>
> env
>       .addSource(new LongSource())
>       .keyBy(elmnt -> elmnt)
>       .process(new KeyedProcessFunction<Long, Long, Long>() {
>
>          @Override
>          public void processElement(Long value, Context ctx, Collector<Long> 
> out) throws Exception {
>
>
>             long timestamp = ctx.timestamp();
>             long timerTimestamp = timestamp + 
> Time.seconds(10).toMilliseconds();
>
>             System.out.println(ctx.timestamp() + " " + timerTimestamp);
>
>             ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
>          }
>
>          @Override
>          public void onTimer(long timestamp, OnTimerContext ctx, 
> Collector<Long> out) throws Exception {
>             System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
>          }
>       }).print();
> env.execute();
>
> The source is:
>
> private static final class LongSource implements SourceFunction<Long> {
>
>    private volatile boolean running = true;
>
>    private long element = 0L;
>
>    @Override
>    public void run(SourceContext<java.lang.Long> ctx) throws Exception {
>       while (running) {
>          ctx.collect(element++ % 10);
>          Thread.sleep(10L);
>       }
>    }
>
>    @Override
>    public void cancel() {
>
>    }
> }
>
>
> Could you provide more details on how your usecase differs from the above
> dummy example so that we can pin down the problem?
>
> As a side-note, Ingestion time is essentially event time, with the only
> difference that the timestamp assigner in the beginning gives each element
> the timestamp System.currentTimeMillis. So in this case, maybe you could
> also consider setting event time timers but keep in mind then your
> Watermark emission interval.
>
> In addition, if you want to simply check processing time processing of you
> operator (not the whole pipeline), then you could make use of the
> OneInputStreamTaskTestHarness or its keyed variant. This allows you to
> provide your own processing time provider thus allow you to
> deterministically
> test processing time behaviour.
>
> Cheers,
> Kostas
>
>
>
> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <avi.l...@bluevoyant.com> wrote:
>
>> Any idea what should I do to overcome this?
>>
>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>
>>> Hi Andrey,
>>> I am testing a Filter operator that receives a key from the stream and
>>> checks if it is a new one or not. if it is new it keeps it in state and
>>> fire a timer all that is done using the ProcessFunction.
>>> The testing is using some CollectSink as described here
>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
>>>  and
>>> the source is implementation of the SourceFunction that accepts a
>>> collection of values and adds it to ctx.collect .
>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>>> x) the timer is fired immediately.
>>>
>>>
>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com>
>>> wrote:
>>>
>>>> Hi Avi,
>>>>
>>>> what is the structure of your unit test? do you create some source and
>>>> then apply function or you test only ProcessFunction methods in isolation?
>>>> does ctx.timestamp() return zero or which value?
>>>>
>>>> Best,
>>>> Andrey
>>>>
>>>>
>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com>
>>>> wrote:
>>>>
>>>>> Hi Andrey ,
>>>>> I'm using IngestionTime
>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>>
>>>>> This is my timer in the processElement:
>>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>>
>>>>> The problem is how do I use it in my unit tests ? since there is no
>>>>> IngestionTime and timers are fired immediately so the timers actions (such
>>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Avi,
>>>>>>
>>>>>> do you use processing time timer
>>>>>> (timerService().registerProcessingTimeTimer)?
>>>>>> why do you need ingestion time? do you
>>>>>> set TimeCharacteristic.IngestionTime?
>>>>>>
>>>>>> Best,
>>>>>> Andrey
>>>>>>
>>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Our stream is not based on time sequence and we do not use time
>>>>>>> based operations. we do want to clean the state after x days hence we 
>>>>>>> fire
>>>>>>> timer event. My problem is that our unit test fires the event 
>>>>>>> immediately
>>>>>>> (there is no ingestion time) how can I inject ingestion time ?
>>>>>>>
>>>>>>> Cheers
>>>>>>> Avi
>>>>>>>
>>>>>>>

Reply via email to