No problem :) You motivated me to do a fix for that, since I stumbled across this bug/issue myself before and also took me some time in the debugger to find the cause.
Piotrek > On 16 Aug 2018, at 20:05, Ken Krugler <kkrugler_li...@transpac.com> wrote: > > Hi Piotr, > > Thanks, and darn it that’s something I should have noticed. > > — Ken > > >> On Aug 16, 2018, at 4:37 AM, Piotr Nowojski <pi...@data-artisans.com >> <mailto:pi...@data-artisans.com>> wrote: >> >> Hi, >> >> You made a small mistake when restoring from state using test harness, that >> I myself have also done in the past. Problem is with an ordering of those >> calls: >> >> result.open(); >> if (savedState != null) { >> result.initializeState(savedState); >> } >> >> Open is supposed to be called after initializeState, and if you look into >> the code of AbstractStreamOperatorTestHarness#open, if it is called before >> initialize, it will initialize harness without any state. >> >> Unfortunate is that this is implicit behaviour that doesn’t throw any error >> (test harness is not part of a Flink’s public api). I will try to fix this: >> https://issues.apache.org/jira/browse/FLINK-10159 >> <https://issues.apache.org/jira/browse/FLINK-10159> >> >> Piotrek >> >>> On 16 Aug 2018, at 00:24, Ken Krugler <kkrugler_li...@transpac.com >>> <mailto:kkrugler_li...@transpac.com>> wrote: >>> >>> Hi all, >>> >>> It looks to me like the OperatorSubtaskState returned from >>> OneInputStreamOperatorTestHarness.snapshot fails to include any timers that >>> had been registered via registerProcessingTimeTimer but had not yet fired >>> when the snapshot was saved. >>> >>> Is this a known limitation of OneInputStreamOperatorTestHarness? >>> >>> If not, is there anything special I need to do when setting up the test >>> harness to ensure that timers are saved? >>> >>> Below is the unit test, which shows how the test harness is being set up >>> and run. >>> >>> The TimerFunction used in this test does seem to be doing the right thing, >>> as using it in a simple job on a local Flink cluster works as expected when >>> creating & then restarting from a savepoint. >>> >>> Thanks, >>> >>> — Ken >>> >>> ================================================================================================== >>> TimerTest.java >>> ================================================================================================== >>> package com.scaleunlimited.flinkcrawler.functions; >>> >>> import static org.junit.Assert.assertTrue; >>> >>> import java.util.ArrayList; >>> import java.util.List; >>> >>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >>> import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; >>> import org.apache.flink.streaming.api.operators.KeyedProcessOperator; >>> import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; >>> import >>> org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness; >>> import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; >>> import org.junit.Before; >>> import org.junit.Test; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import com.scaleunlimited.flinkcrawler.tools.TimerTool; >>> >>> public class TimerTest { >>> public static final Logger LOGGER = >>> LoggerFactory.getLogger(TimerTest.class); >>> >>> private List<Long> _firedTimers = new ArrayList<Long>(); >>> >>> @Before >>> public void setUp() throws Exception { >>> } >>> >>> @Test >>> public void testTimerSaving() throws Throwable { >>> >>> // This operator doesn't really do much at all, but the first >>> element >>> // it processes will create a timer for (timestamp+1). >>> // Whenever that timer fires, it will create another timer for >>> // (timestamp+1). >>> KeyedProcessOperator<Integer, Integer, Integer> operator = >>> new KeyedProcessOperator<>(new TimerFunction(_firedTimers)); >>> >>> // Create a test harness from scratch >>> OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = >>> makeTestHarness(operator, null); >>> >>> // We begin at time zero >>> testHarness.setProcessingTime(0); >>> >>> // Process some elements, which should also create a timer for time >>> 1. >>> int inputs[] = new int[] {1, 2, 3}; >>> for (int input : inputs) { >>> testHarness.processElement(new StreamRecord<>(input)); >>> } >>> >>> // Force some time to pass, which should keep moving the timer >>> ahead, >>> // finally leaving it set for time 10. >>> for (long i = 1; i < 10; i++) { >>> testHarness.setProcessingTime(i); >>> } >>> >>> // Save the state, which we assume should include the timer we set >>> for >>> // time 10. >>> OperatorSubtaskState savedState = >>> testHarness.snapshot(0L, testHarness.getProcessingTime()); >>> >>> // Close the first test harness >>> testHarness.close(); >>> >>> // Create a new test harness using the saved state (which we assume >>> // includes the timer for time 10). >>> testHarness = makeTestHarness(operator, savedState); >>> >>> // Force more time to pass, which should keep moving the timer >>> ahead. >>> for (long i = 10; i < 20; i++) { >>> testHarness.setProcessingTime(i); >>> } >>> >>> // Close the second test harness and make sure all the timers we >>> expect >>> // actually fired. >>> testHarness.close(); >>> for (long i = 1; i < 20; i++) { >>> >>> // TODO This expectation currently fails, since Timers don't >>> // seem to be included in the snapshot, at least the one >>> produced by >>> // the test harness. >>> assertTrue(_firedTimers.contains(i)); >>> } >>> } >>> >>> private OneInputStreamOperatorTestHarness<Integer, Integer> >>> makeTestHarness( >>> KeyedProcessOperator<Integer, Integer, Integer> operator, >>> OperatorSubtaskState savedState) >>> throws Exception { >>> OneInputStreamOperatorTestHarness<Integer, Integer> result; >>> result = >>> new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, >>> Integer>( >>> operator, >>> new TimerTool.IdentityKeySelector<Integer>(), >>> BasicTypeInfo.INT_TYPE_INFO); >>> result.setup(); >>> result.open(); >>> if (savedState != null) { >>> result.initializeState(savedState); >>> } >>> return result; >>> } >>> } >>> >>> >>> ================================================================================================== >>> TimerFunction.java >>> ================================================================================================== >>> package com.scaleunlimited.flinkcrawler.functions; >>> >>> import java.util.List; >>> >>> import org.apache.flink.streaming.api.functions.KeyedProcessFunction; >>> import org.apache.flink.util.Collector; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> @SuppressWarnings("serial") >>> public class TimerFunction >>> extends KeyedProcessFunction<Integer, Integer, Integer> { >>> static final Logger LOGGER = >>> LoggerFactory.getLogger(TimerFunction.class); >>> >>> List<Long> _firedTimers; >>> long _period; >>> >>> public TimerFunction(List<Long> firedTimers) { >>> this(firedTimers, 1); >>> } >>> >>> public TimerFunction(List<Long> firedTimers, long period) { >>> super(); >>> _firedTimers = firedTimers; >>> _period = period; >>> } >>> >>> @Override >>> public void onTimer(long timestamp, >>> KeyedProcessFunction<Integer, Integer, >>> Integer>.OnTimerContext context, >>> Collector<Integer> out) throws Exception { >>> super.onTimer(timestamp, context, out); >>> _firedTimers.add(timestamp); >>> long nextTimestamp = timestamp + _period; >>> LOGGER.info <http://logger.info/>("Firing at {}; Setting new timer >>> for {}", >>> timestamp, >>> nextTimestamp); >>> context.timerService().registerProcessingTimeTimer(nextTimestamp); >>> } >>> >>> @Override >>> public void processElement( Integer input, >>> KeyedProcessFunction<Integer, Integer, >>> Integer>.Context context, >>> Collector<Integer> out) >>> throws Exception { >>> >>> LOGGER.info <http://logger.info/>("Processing input {}", input); >>> if (_firedTimers.isEmpty()) { >>> long firstTimestamp = >>> context.timerService().currentProcessingTime() + _period; >>> LOGGER.info <http://logger.info/>("Setting initial timer for >>> {}", >>> firstTimestamp); >>> >>> context.timerService().registerProcessingTimeTimer(firstTimestamp); >>> } >>> >>> out.collect(input); >>> } >>> } >>> >>> >>> >>> -------------------------- >>> Ken Krugler >>> +1 530-210-6378 >>> http://www.scaleunlimited.com <http://www.scaleunlimited.com/> >>> Custom big data solutions & training >>> Flink, Solr, Hadoop, Cascading & Cassandra >>> >> > > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com <http://www.scaleunlimited.com/> > Custom big data solutions & training > Flink, Solr, Hadoop, Cascading & Cassandra >