Hi all,

It looks to me like the OperatorSubtaskState returned from 
OneInputStreamOperatorTestHarness.snapshot fails to include any timers that had 
been registered via registerProcessingTimeTimer but had not yet fired when the 
snapshot was saved.

Is this a known limitation of OneInputStreamOperatorTestHarness?

If not, is there anything special I need to do when setting up the test harness 
to ensure that timers are saved?

Below is the unit test, which shows how the test harness is being set up and 
run.

The TimerFunction used in this test does seem to be doing the right thing, as 
using it in a simple job on a local Flink cluster works as expected when 
creating & then restarting from a savepoint.

Thanks,

— Ken

==================================================================================================
TimerTest.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.scaleunlimited.flinkcrawler.tools.TimerTool;

public class TimerTest {
    public static final Logger LOGGER = 
LoggerFactory.getLogger(TimerTest.class);

    private List<Long> _firedTimers = new ArrayList<Long>();

    @Before
    public void setUp() throws Exception {
    }
    
    @Test
    public void testTimerSaving() throws Throwable {
        
        // This operator doesn't really do much at all, but the first element
        // it processes will create a timer for (timestamp+1).
        // Whenever that timer fires, it will create another timer for 
        // (timestamp+1).
        KeyedProcessOperator<Integer, Integer, Integer> operator = 
            new KeyedProcessOperator<>(new TimerFunction(_firedTimers));
        
        // Create a test harness from scratch
        OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = 
            makeTestHarness(operator, null);
        
        // We begin at time zero
        testHarness.setProcessingTime(0);

        // Process some elements, which should also create a timer for time 1.
        int inputs[] = new int[] {1, 2, 3};
        for (int input : inputs) {
            testHarness.processElement(new StreamRecord<>(input));
        }
        
        // Force some time to pass, which should keep moving the timer ahead,
        // finally leaving it set for time 10.
        for (long i = 1; i < 10; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Save the state, which we assume should include the timer we set for
        // time 10.
        OperatorSubtaskState savedState = 
            testHarness.snapshot(0L, testHarness.getProcessingTime());
        
        // Close the first test harness
        testHarness.close();
        
        // Create a new test harness using the saved state (which we assume
        // includes the timer for time 10).
        testHarness = makeTestHarness(operator, savedState);
        
        // Force more time to pass, which should keep moving the timer ahead.
        for (long i = 10; i < 20; i++) {
            testHarness.setProcessingTime(i);
        }
        
        // Close the second test harness and make sure all the timers we expect
        // actually fired.
        testHarness.close();
        for (long i = 1; i < 20; i++) {
            
            // TODO This expectation currently fails, since Timers don't
            // seem to be included in the snapshot, at least the one produced by
            // the test harness.
            assertTrue(_firedTimers.contains(i));
        }
    }

    private OneInputStreamOperatorTestHarness<Integer, Integer> makeTestHarness(
            KeyedProcessOperator<Integer, Integer, Integer> operator,
            OperatorSubtaskState savedState) 
            throws Exception {
        OneInputStreamOperatorTestHarness<Integer, Integer> result;
        result = 
            new KeyedOneInputStreamOperatorTestHarness<Integer, Integer, 
Integer>(
                    operator,
                    new TimerTool.IdentityKeySelector<Integer>(),
                    BasicTypeInfo.INT_TYPE_INFO);
        result.setup();
        result.open();
        if (savedState != null) {
            result.initializeState(savedState);
        }
        return result;
    }
}


==================================================================================================
TimerFunction.java
==================================================================================================
package com.scaleunlimited.flinkcrawler.functions;

import java.util.List;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("serial")
public class TimerFunction
    extends KeyedProcessFunction<Integer, Integer, Integer> {
    static final Logger LOGGER = LoggerFactory.getLogger(TimerFunction.class);
   
    List<Long> _firedTimers;
    long _period;

    public TimerFunction(List<Long> firedTimers) {
        this(firedTimers, 1);
    }

    public TimerFunction(List<Long> firedTimers, long period) {
        super();
        _firedTimers = firedTimers;
        _period = period;
    }

    @Override
    public void onTimer(long timestamp,
                        KeyedProcessFunction<Integer, Integer, 
Integer>.OnTimerContext context,
                        Collector<Integer> out) throws Exception {
        super.onTimer(timestamp, context, out);
        _firedTimers.add(timestamp);
        long nextTimestamp = timestamp + _period;
        LOGGER.info("Firing at {}; Setting new timer for {}",
                    timestamp,
                    nextTimestamp);
        context.timerService().registerProcessingTimeTimer(nextTimestamp);
    }

    @Override
    public void processElement( Integer input,
                                KeyedProcessFunction<Integer, Integer, 
Integer>.Context context,
                                Collector<Integer> out)
        throws Exception {
        
        LOGGER.info("Processing input {}", input);
        if (_firedTimers.isEmpty()) {
            long firstTimestamp = 
                context.timerService().currentProcessingTime() + _period;
            LOGGER.info("Setting initial timer for {}",
                        firstTimestamp);
            context.timerService().registerProcessingTimeTimer(firstTimestamp);
        }
        
        out.collect(input);
    }
}



--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to