Hi, You made a small mistake when restoring from state using test harness, that I myself have also done in the past. Problem is with an ordering of those calls:
result.open(); if (savedState != null) { result.initializeState(savedState); } Open is supposed to be called after initializeState, and if you look into the code of AbstractStreamOperatorTestHarness#open, if it is called before initialize, it will initialize harness without any state. Unfortunate is that this is implicit behaviour that doesn’t throw any error (test harness is not part of a Flink’s public api). I will try to fix this: https://issues.apache.org/jira/browse/FLINK-10159 <https://issues.apache.org/jira/browse/FLINK-10159> Piotrek > On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi all, > > It looks to me like the OperatorSubtaskState returned from > OneInputStreamOperatorTestHarness.snapshot fails to include any timers that > had been registered via registerProcessingTimeTimer but had not yet fired > when the snapshot was saved. > > Is this a known limitation of OneInputStreamOperatorTestHarness? > > If not, is there anything special I need to do when setting up the test > harness to ensure that timers are saved? > > Below is the unit test, which shows how the test harness is being set up and > run. > > The TimerFunction used in this test does seem to be doing the right thing, as > using it in a simple job on a local Flink cluster works as expected when > creating & then restarting from a savepoint. > > Thanks, > > — Ken > > ================================================================================================== > TimerTest.java > ================================================================================================== > package com.scaleunlimited.flinkcrawler.functions; > > import static org.junit.Assert.assertTrue; > > import java.util.ArrayList; > import java.util.List; > > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; > import org.apache.flink.streaming.api.operators.KeyedProcessOperator; > import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; > import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; > import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; > import org.junit.Before; > import org.junit.Test; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com.scaleunlimited.flinkcrawler.tools.TimerTool; > > public class TimerTest { > public static final Logger LOGGER = > LoggerFactory.getLogger(TimerTest.class); > > private List<Long> _firedTimers = new ArrayList<Long>(); > > @Before > public void setUp() throws Exception { > } > > @Test > public void testTimerSaving() throws Throwable { > > // This operator doesn't really do much at all, but the first element > // it processes will create a timer for (timestamp+1). > // Whenever that timer fires, it will create another timer for > // (timestamp+1). > KeyedProcessOperator<Integer, Integer, Integer> operator = > new KeyedProcessOperator<>(new TimerFunction(_firedTimers)); > > // Create a test harness from scratch > OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = > makeTestHarness(operator, null); > > // We begin at time zero > testHarness.setProcessingTime(0); > > // Process some elements, which should also create a timer for time 1. > int inputs[] = new int[] {1, 2, 3}; > for (int input : inputs) { > testHarness.processElement(new StreamRecord<>(input)); > } > > // Force some time to pass, which should keep moving the timer ahead, > // finally leaving it set for time 10. > for (long i = 1; i < 10; i++) { > testHarness.setProcessingTime(i); > } > > // Save the state, which we assume should include the timer we set for > // time 10. > OperatorSubtaskState savedState = > testHarness.snapshot(0L, testHarness.getProcessingTime()); > > // Close the first test harness > testHarness.close(); > > // Create a new test harness using the saved state (which we assume > // includes the timer for time 10). > testHarness = makeTestHarness(operator, savedState); > > // Force more time to pass, which should keep moving the timer ahead. > for (long i = 10; i < 20; i++) { > testHarness.setProcessingTime(i); > } > > // Close the second test harness and make sure all the timers we > expect > // actually fired. > testHarness.close(); > for (long i = 1; i < 20; i++) { > > // TODO This expectation currently fails, since Timers don't > // seem to be included in the snapshot, at least the one produced > by > // the test harness. > assertTrue(_firedTimers.contains(i)); > } > } > > private OneInputStreamOperatorTestHarness<Integer, Integer> > makeTestHarness( > KeyedProcessOperator<Integer, Integer, Integer> operator, > OperatorSubtaskState savedState) > throws Exception { > OneInputStreamOperatorTestHarness<Integer, Integer> result; > result = > new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, > Integer>( > operator, > new TimerTool.IdentityKeySelector<Integer>(), > BasicTypeInfo.INT_TYPE_INFO); > result.setup(); > result.open(); > if (savedState != null) { > result.initializeState(savedState); > } > return result; > } > } > > > ================================================================================================== > TimerFunction.java > ================================================================================================== > package com.scaleunlimited.flinkcrawler.functions; > > import java.util.List; > > import org.apache.flink.streaming.api.functions.KeyedProcessFunction; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > @SuppressWarnings("serial") > public class TimerFunction > extends KeyedProcessFunction<Integer, Integer, Integer> { > static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class); > > List<Long> _firedTimers; > long _period; > > public TimerFunction(List<Long> firedTimers) { > this(firedTimers, 1); > } > > public TimerFunction(List<Long> firedTimers, long period) { > super(); > _firedTimers = firedTimers; > _period = period; > } > > @Override > public void onTimer(long timestamp, > KeyedProcessFunction<Integer, Integer, > Integer>.OnTimerContext context, > Collector<Integer> out) throws Exception { > super.onTimer(timestamp, context, out); > _firedTimers.add(timestamp); > long nextTimestamp = timestamp + _period; > LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer > for {}", > timestamp, > nextTimestamp); > context.timerService().registerProcessingTimeTimer(nextTimestamp); > } > > @Override > public void processElement( Integer input, > KeyedProcessFunction<Integer, Integer, > Integer>.Context context, > Collector<Integer> out) > throws Exception { > > LOGGER.info <http://logger.info/>("Processing input {}", input); > if (_firedTimers.isEmpty()) { > long firstTimestamp = > context.timerService().currentProcessingTime() + _period; > LOGGER.info <http://logger.info/>("Setting initial timer for {}", > firstTimestamp); > > context.timerService().registerProcessingTimeTimer(firstTimestamp); > } > > out.collect(input); > } > } > > > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra >