Hi,

You made a small mistake when restoring from state using test harness, that I 
myself have also done in the past. Problem is with an ordering of those calls:

        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }

Open is supposed to be called after initializeState, and if you look into the 
code of AbstractStreamOperatorTestHarness#open, if it is called before 
initialize, it will initialize harness without any state.

Unfortunate is that this is implicit behaviour that doesn’t throw any error 
(test harness is not part of a Flink’s public api). I will try to fix this: 
https://issues.apache.org/jira/browse/FLINK-10159 
<https://issues.apache.org/jira/browse/FLINK-10159>

Piotrek

> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com> wrote:
> 
> Hi all,
> 
> It looks to me like the OperatorSubtaskState returned from 
> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that 
> had been registered via registerProcessingTimeTimer but had not yet fired 
> when the snapshot was saved.
> 
> Is this a known limitation of OneInputStreamOperatorTestHarness?
> 
> If not, is there anything special I need to do when setting up the test 
> harness to ensure that timers are saved?
> 
> Below is the unit test, which shows how the test harness is being set up and 
> run.
> 
> The TimerFunction used in this test does seem to be doing the right thing, as 
> using it in a simple job on a local Flink cluster works as expected when 
> creating & then restarting from a savepoint.
> 
> Thanks,
> 
> — Ken
> 
> ==================================================================================================
> TimerTest.java
> ==================================================================================================
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import static org.junit.Assert.assertTrue;
> 
> import java.util.ArrayList;
> import java.util.List;
> 
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
> import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
> import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
> import org.junit.Before;
> import org.junit.Test;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> import com.scaleunlimited.flinkcrawler.tools.TimerTool;
> 
> public class TimerTest {
>     public static final Logger LOGGER = 
> LoggerFactory.getLogger(TimerTest.class);
> 
>     private List<Long> _firedTimers = new ArrayList<Long>();
> 
>     @Before
>     public void setUp() throws Exception {
>     }
>     
>     @Test
>     public void testTimerSaving() throws Throwable {
>         
>         // This operator doesn't really do much at all, but the first element
>         // it processes will create a timer for (timestamp+1).
>         // Whenever that timer fires, it will create another timer for 
>         // (timestamp+1).
>         KeyedProcessOperator<Integer, Integer, Integer> operator = 
>             new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
>         
>         // Create a test harness from scratch
>         OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
>             makeTestHarness(operator, null);
>         
>         // We begin at time zero
>         testHarness.setProcessingTime(0);
> 
>         // Process some elements, which should also create a timer for time 1.
>         int inputs[] = new int[] {1, 2, 3};
>         for (int input : inputs) {
>             testHarness.processElement(new StreamRecord<>(input));
>         }
>         
>         // Force some time to pass, which should keep moving the timer ahead,
>         // finally leaving it set for time 10.
>         for (long i = 1; i < 10; i++) {
>             testHarness.setProcessingTime(i);
>         }
>         
>         // Save the state, which we assume should include the timer we set for
>         // time 10.
>         OperatorSubtaskState savedState = 
>             testHarness.snapshot(0L, testHarness.getProcessingTime());
>         
>         // Close the first test harness
>         testHarness.close();
>         
>         // Create a new test harness using the saved state (which we assume
>         // includes the timer for time 10).
>         testHarness = makeTestHarness(operator, savedState);
>         
>         // Force more time to pass, which should keep moving the timer ahead.
>         for (long i = 10; i < 20; i++) {
>             testHarness.setProcessingTime(i);
>         }
>         
>         // Close the second test harness and make sure all the timers we 
> expect
>         // actually fired.
>         testHarness.close();
>         for (long i = 1; i < 20; i++) {
>             
>             // TODO This expectation currently fails, since Timers don't
>             // seem to be included in the snapshot, at least the one produced 
> by
>             // the test harness.
>             assertTrue(_firedTimers.contains(i));
>         }
>     }
> 
>     private OneInputStreamOperatorTestHarness<Integer, Integer> 
> makeTestHarness(
>             KeyedProcessOperator<Integer, Integer, Integer> operator,
>             OperatorSubtaskState savedState) 
>             throws Exception {
>         OneInputStreamOperatorTestHarness<Integer, Integer> result;
>         result = 
>             new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
> Integer>(
>                     operator,
>                     new TimerTool.IdentityKeySelector<Integer>(),
>                     BasicTypeInfo.INT_TYPE_INFO);
>         result.setup();
>         result.open();
>         if (savedState != null) {
>             result.initializeState(savedState);
>         }
>         return result;
>     }
> }
> 
> 
> ==================================================================================================
> TimerFunction.java
> ==================================================================================================
> package com.scaleunlimited.flinkcrawler.functions;
> 
> import java.util.List;
> 
> import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
> import org.apache.flink.util.Collector;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> 
> @SuppressWarnings("serial")
> public class TimerFunction
>     extends KeyedProcessFunction<Integer, Integer, Integer> {
>     static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
>    
>     List<Long> _firedTimers;
>     long _period;
> 
>     public TimerFunction(List<Long> firedTimers) {
>         this(firedTimers, 1);
>     }
> 
>     public TimerFunction(List<Long> firedTimers, long period) {
>         super();
>         _firedTimers = firedTimers;
>         _period = period;
>     }
> 
>     @Override
>     public void onTimer(long timestamp,
>                         KeyedProcessFunction<Integer, Integer, 
> Integer>.OnTimerContext context,
>                         Collector<Integer> out) throws Exception {
>         super.onTimer(timestamp, context, out);
>         _firedTimers.add(timestamp);
>         long nextTimestamp = timestamp + _period;
>         LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer 
> for {}",
>                     timestamp,
>                     nextTimestamp);
>         context.timerService().registerProcessingTimeTimer(nextTimestamp);
>     }
> 
>     @Override
>     public void processElement( Integer input,
>                                 KeyedProcessFunction<Integer, Integer, 
> Integer>.Context context,
>                                 Collector<Integer> out)
>         throws Exception {
>         
>         LOGGER.info <http://logger.info/>("Processing input {}", input);
>         if (_firedTimers.isEmpty()) {
>             long firstTimestamp = 
>                 context.timerService().currentProcessingTime() + _period;
>             LOGGER.info <http://logger.info/>("Setting initial timer for {}",
>                         firstTimestamp);
>             
> context.timerService().registerProcessingTimeTimer(firstTimestamp);
>         }
>         
>         out.collect(input);
>     }
> }
> 
> 
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
> 

Reply via email to