Hi Avi,

Good to hear that!

Cheers,
Kostas

On Mon, Mar 25, 2019 at 3:37 PM Avi Levi <avi.l...@bluevoyant.com> wrote:

> Thanks, I'll check it out. I got a bit confused with the Ingesting time
> equals to null in tests but all is ok now , I appreciate that
>
> On Mon, Mar 25, 2019 at 1:01 PM Kostas Kloudas <kklou...@gmail.com> wrote:
>
>> Hi Avi,
>>
>> Just to verify your ITCase, I wrote the following dummy example and it
>> seems to be "working" (ie. I can see non null timestamps and timers firing).
>>
>>
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> env.setParallelism(1);
>>
>> env
>>       .addSource(new LongSource())
>>       .keyBy(elmnt -> elmnt)
>>       .process(new KeyedProcessFunction<Long, Long, Long>() {
>>
>>          @Override
>>          public void processElement(Long value, Context ctx, Collector<Long> 
>> out) throws Exception {
>>
>>
>>             long timestamp = ctx.timestamp();
>>             long timerTimestamp = timestamp + 
>> Time.seconds(10).toMilliseconds();
>>
>>             System.out.println(ctx.timestamp() + " " + timerTimestamp);
>>
>>             ctx.timerService().registerProcessingTimeTimer(timerTimestamp);
>>          }
>>
>>          @Override
>>          public void onTimer(long timestamp, OnTimerContext ctx, 
>> Collector<Long> out) throws Exception {
>>             System.out.println("TIMER: " + timestamp +" "+ ctx.timeDomain());
>>          }
>>       }).print();
>> env.execute();
>>
>> The source is:
>>
>> private static final class LongSource implements SourceFunction<Long> {
>>
>>    private volatile boolean running = true;
>>
>>    private long element = 0L;
>>
>>    @Override
>>    public void run(SourceContext<java.lang.Long> ctx) throws Exception {
>>       while (running) {
>>          ctx.collect(element++ % 10);
>>          Thread.sleep(10L);
>>       }
>>    }
>>
>>    @Override
>>    public void cancel() {
>>
>>    }
>> }
>>
>>
>> Could you provide more details on how your usecase differs from the above
>> dummy example so that we can pin down the problem?
>>
>> As a side-note, Ingestion time is essentially event time, with the only
>> difference that the timestamp assigner in the beginning gives each element
>> the timestamp System.currentTimeMillis. So in this case, maybe you could
>> also consider setting event time timers but keep in mind then your
>> Watermark emission interval.
>>
>> In addition, if you want to simply check processing time processing of
>> you operator (not the whole pipeline), then you could make use of the
>> OneInputStreamTaskTestHarness or its keyed variant. This allows you to
>> provide your own processing time provider thus allow you to
>> deterministically
>> test processing time behaviour.
>>
>> Cheers,
>> Kostas
>>
>>
>>
>> On Sat, Mar 23, 2019 at 9:32 AM Avi Levi <avi.l...@bluevoyant.com> wrote:
>>
>>> Any idea what should I do to overcome this?
>>>
>>> On Wed, Mar 20, 2019 at 7:17 PM Avi Levi <avi.l...@bluevoyant.com>
>>> wrote:
>>>
>>>> Hi Andrey,
>>>> I am testing a Filter operator that receives a key from the stream and
>>>> checks if it is a new one or not. if it is new it keeps it in state and
>>>> fire a timer all that is done using the ProcessFunction.
>>>> The testing is using some CollectSink as described here
>>>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#integration-testing>
>>>>  and
>>>> the source is implementation of the SourceFunction that accepts a
>>>> collection of values and adds it to ctx.collect .
>>>> The ctx.timestamp() is null, BUT even if I set the timer to sometime in
>>>> the future ctx.timerService.registerProcessingTimeTimer(currenttimestamp +
>>>> x) the timer is fired immediately.
>>>>
>>>>
>>>> On Wed, Mar 20, 2019 at 10:39 AM Andrey Zagrebin <and...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Avi,
>>>>>
>>>>> what is the structure of your unit test? do you create some source and
>>>>> then apply function or you test only ProcessFunction methods in isolation?
>>>>> does ctx.timestamp() return zero or which value?
>>>>>
>>>>> Best,
>>>>> Andrey
>>>>>
>>>>>
>>>>> On Tue, Mar 19, 2019 at 9:19 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Andrey ,
>>>>>> I'm using IngestionTime
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
>>>>>>
>>>>>> This is my timer in the processElement:
>>>>>>    val nextTime: Long = ctx.timestamp()  + daysInMilliseconds(14)
>>>>>>    ctx.timerService.registerProcessingTimeTimer(nextTim)
>>>>>>
>>>>>> The problem is how do I use it in my unit tests ? since there is no
>>>>>> IngestionTime and timers are fired immediately so the timers actions 
>>>>>> (such
>>>>>> as state cleanup) are fired before time and causing the tests to fail .
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Mar 19, 2019 at 7:47 PM Andrey Zagrebin <and...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Avi,
>>>>>>>
>>>>>>> do you use processing time timer
>>>>>>> (timerService().registerProcessingTimeTimer)?
>>>>>>> why do you need ingestion time? do you
>>>>>>> set TimeCharacteristic.IngestionTime?
>>>>>>>
>>>>>>> Best,
>>>>>>> Andrey
>>>>>>>
>>>>>>> On Tue, Mar 19, 2019 at 1:11 PM Avi Levi <avi.l...@bluevoyant.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Our stream is not based on time sequence and we do not use time
>>>>>>>> based operations. we do want to clean the state after x days hence we 
>>>>>>>> fire
>>>>>>>> timer event. My problem is that our unit test fires the event 
>>>>>>>> immediately
>>>>>>>> (there is no ingestion time) how can I inject ingestion time ?
>>>>>>>>
>>>>>>>> Cheers
>>>>>>>> Avi
>>>>>>>>
>>>>>>>>

Reply via email to