No problem :) You motivated me to do a fix for that, since I stumbled across 
this bug/issue myself before and also took me some time in the debugger to find 
the cause.

Piotrek

> On 16 Aug 2018, at 20:05, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi Piotr,
> 
> Thanks, and darn it that’s something I should have noticed.
> 
> — Ken
> 
> 
>> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> 
>> Hi,
>> 
>> You made a small mistake when restoring from state using test harness, that 
>> I myself have also done in the past. Problem is with an ordering of those 
>> calls:
>> 
>>         result.open();
>>         if (savedState != null) {
>>             result.initializeState(savedState);
>>         }
>> 
>> Open is supposed to be called after initializeState, and if you look into 
>> the code of AbstractStreamOperatorTestHarness#open, if it is called before 
>> initialize, it will initialize harness without any state.
>> 
>> Unfortunate is that this is implicit behaviour that doesn’t throw any error 
>> (test harness is not part of a Flink’s public api). I will try to fix this: 
>> https://issues.apache.org/jira/browse/FLINK-10159 
>> <https://issues.apache.org/jira/browse/FLINK-10159>
>> 
>> Piotrek
>> 
>>> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com 
>>> <mailto:kkrugler_li...@transpac.com>> wrote:
>>> 
>>> Hi all,
>>> 
>>> It looks to me like the OperatorSubtaskState returned from 
>>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
>>> had been registered via registerProcessingTimeTimer but had not yet fired 
>>> when the snapshot was saved.
>>> 
>>> Is this a known limitation of OneInputStreamOperatorTestHarness?
>>> 
>>> If not, is there anything special I need to do when setting up the test 
>>> harness to ensure that timers are saved?
>>> 
>>> Below is the unit test, which shows how the test harness is being set up 
>>> and run.
>>> 
>>> The TimerFunction used in this test does seem to be doing the right thing, 
>>> as using it in a simple job on a local Flink cluster works as expected when 
>>> creating & then restarting from a savepoint.
>>> 
>>> Thanks,
>>> 
>>> — Ken
>>> 
>>> ==================================================================================================
>>> TimerTest.java
>>> ==================================================================================================
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import static org.junit.Assert.assertTrue;
>>> 
>>> import java.util.ArrayList;
>>> import java.util.List;
>>> 
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
>>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
>>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
>>> import 
>>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
>>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
>>> import org.junit.Before;
>>> import org.junit.Test;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
>>> 
>>> public class TimerTest {
>>>     public static final Logger LOGGER = 
>>> LoggerFactory.getLogger(TimerTest.class);
>>> 
>>>     private List<Long> _firedTimers = new ArrayList<Long>();
>>> 
>>>     @Before
>>>     public void setUp() throws Exception {
>>>     }
>>>     
>>>     @Test
>>>     public void testTimerSaving() throws Throwable {
>>>         
>>>         // This operator doesn't really do much at all, but the first 
>>> element
>>>         // it processes will create a timer for (timestamp+1).
>>>         // Whenever that timer fires, it will create another timer for 
>>>         // (timestamp+1).
>>>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>>>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>>>         
>>>         // Create a test harness from scratch
>>>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>>>             makeTestHarness(operator, null);
>>>         
>>>         // We begin at time zero
>>>         testHarness.setProcessingTime(0);
>>> 
>>>         // Process some elements, which should also create a timer for time 
>>> 1.
>>>         int inputs[] = new int[] {1, 2, 3};
>>>         for (int input : inputs) {
>>>             testHarness.processElement(new StreamRecord<>(input));
>>>         }
>>>         
>>>         // Force some time to pass, which should keep moving the timer 
>>> ahead,
>>>         // finally leaving it set for time 10.
>>>         for (long i = 1; i < 10; i++) {
>>>             testHarness.setProcessingTime(i);
>>>         }
>>>         
>>>         // Save the state, which we assume should include the timer we set 
>>> for
>>>         // time 10.
>>>         OperatorSubtaskState savedState = 
>>>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>>>         
>>>         // Close the first test harness
>>>         testHarness.close();
>>>         
>>>         // Create a new test harness using the saved state (which we assume
>>>         // includes the timer for time 10).
>>>         testHarness = makeTestHarness(operator, savedState);
>>>         
>>>         // Force more time to pass, which should keep moving the timer 
>>> ahead.
>>>         for (long i = 10; i < 20; i++) {
>>>             testHarness.setProcessingTime(i);
>>>         }
>>>         
>>>         // Close the second test harness and make sure all the timers we 
>>> expect
>>>         // actually fired.
>>>         testHarness.close();
>>>         for (long i = 1; i < 20; i++) {
>>>             
>>>             // TODO This expectation currently fails, since Timers don't
>>>             // seem to be included in the snapshot, at least the one 
>>> produced by
>>>             // the test harness.
>>>             assertTrue(_firedTimers.contains(i));
>>>         }
>>>     }
>>> 
>>>     private OneInputStreamOperatorTestHarness<Integer, Integer> 
>>> makeTestHarness(
>>>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>>>             OperatorSubtaskState savedState) 
>>>             throws Exception {
>>>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>>>         result = 
>>>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
>>> Integer>(
>>>                     operator,
>>>                     new TimerTool.IdentityKeySelector<Integer>(),
>>>                     BasicTypeInfo.INT_TYPE_INFO);
>>>         result.setup();
>>>         result.open();
>>>         if (savedState != null) {
>>>             result.initializeState(savedState);
>>>         }
>>>         return result;
>>>     }
>>> }
>>> 
>>> 
>>> ==================================================================================================
>>> TimerFunction.java
>>> ==================================================================================================
>>> package com.scaleunlimited.flinkcrawler.functions;
>>> 
>>> import java.util.List;
>>> 
>>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
>>> import org.apache.flink.util.Collector;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> 
>>> @SuppressWarnings("serial")
>>> public class TimerFunction
>>>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>>>     static final Logger LOGGER = 
>>> LoggerFactory.getLogger(TimerFunction.class);
>>>    
>>>     List<Long> _firedTimers;
>>>     long _period;
>>> 
>>>     public TimerFunction(List<Long> firedTimers) {
>>>         this(firedTimers, 1);
>>>     }
>>> 
>>>     public TimerFunction(List<Long> firedTimers, long period) {
>>>         super();
>>>         _firedTimers = firedTimers;
>>>         _period = period;
>>>     }
>>> 
>>>     @Override
>>>     public void onTimer(long timestamp,
>>>                         KeyedProcessFunction<Integer, Integer, 
>>> Integer>.OnTimerContext context,
>>>                         Collector<Integer> out) throws Exception {
>>>         super.onTimer(timestamp, context, out);
>>>         _firedTimers.add(timestamp);
>>>         long nextTimestamp = timestamp + _period;
>>>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer 
>>> for {}",
>>>                     timestamp,
>>>                     nextTimestamp);
>>>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>>>     }
>>> 
>>>     @Override
>>>     public void processElement( Integer input,
>>>                                 KeyedProcessFunction<Integer, Integer, 
>>> Integer>.Context context,
>>>                                 Collector<Integer> out)
>>>         throws Exception {
>>>         
>>>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>>>         if (_firedTimers.isEmpty()) {
>>>             long firstTimestamp = 
>>>                 context.timerService().currentProcessingTime() + _period;
>>>             LOGGER.info <http://logger.info/>("Setting initial timer for 
>>> {}",
>>>                         firstTimestamp);
>>>             
>>> context.timerService().registerProcessingTimeTimer(firstTimestamp);
>>>         }
>>>         
>>>         out.collect(input);
>>>     }
>>> }
>>> 
>>> 
>>> 
>>> --------------------------
>>> Ken Krugler
>>> +1 530-210-6378
>>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
>>> Custom big data solutions & training
>>> Flink, Solr, Hadoop, Cascading & Cassandra
>>> 
>> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to