sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand Testing Documentation URL: https://github.com/apache/flink/pull/8437#discussion_r283961380
########## File path: docs/dev/stream/testing.md ########## @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends AbstractTestBase { <div data-lang="scala" markdown="1"> {% highlight scala %} -class ExampleIntegrationTest extends AbstractTestBase { +class StreamingJobIntegrationTest extends FlatSpec with Matchers with BeforeAndAfter { - @Test - def testMultiply(): Unit = { - val env = StreamExecutionEnvironment.getExecutionEnvironment + val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build) - // configure your test environment - env.setParallelism(1) + before { + flinkCluster.before() + } - // values are collected in a static variable - CollectSink.values.clear() + after { + flinkCluster.after() + } - // create a stream of custom elements and apply transformations - env - .fromElements(1L, 21L, 22L) - .map(new MultiplyByTwo()) - .addSink(new CollectSink()) - // execute - env.execute() + "IncrementFlatMapFunction pipeline" should "incrementValues" in { - // verify your results - assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values) - } -} + val env = StreamExecutionEnvironment.getExecutionEnvironment + + // configure your test environment + env.setParallelism(2) + // values are collected in a static variable + CollectSink.values.clear() + + // create a stream of custom elements and apply transformations + env.fromElements(1, 21, 22) + .map(new IncrementMapFunction()) + .addSink(new CollectSink()) + + // execute + env.execute() + + // verify your results + CollectSink.values should contain allOf (1,22,23) + } +} // create a testing sink class CollectSink extends SinkFunction[Long] { - override def invoke(value: java.lang.Long): Unit = { - synchronized { - values.add(value) - } + override def invoke(value: Long): Unit = { + synchronized { + CollectSink.values.add(value) } + } } object CollectSink { - // must be static - val values: List[Long] = new ArrayList() + val values: util.List[Long] = new util.ArrayList() } {% endhighlight %} </div> </div> -The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. -Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. -Alternatively, you could for example write the data to files in a temporary directory with your test sink. -You can also implement your own custom sources for emitting watermarks. - -## Testing checkpointing and state handling +A few remarks on integration testing with `MiniClusterWithClientResource`: -One way to test state handling is to enable checkpointing in integration tests. +* In order not to copy your whole pipeline code from production to test, make sources and sinks pluggable in your production code and inject special test sources and test sinks in your tests. -You can do that by configuring your `StreamExecutionEnvironment` in the test: - -<div class="codetabs" markdown="1"> -<div data-lang="java" markdown="1"> -{% highlight java %} -env.enableCheckpointing(500); -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)); -{% endhighlight %} -</div> - -<div data-lang="scala" markdown="1"> -{% highlight scala %} -env.enableCheckpointing(500) -env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100)) -{% endhighlight %} -</div> -</div> +* The static variable in `CollectSink` is used here because Flink serializes all operators before distributing them across a cluster. +Communicating with operators instantiated by a local Flink mini cluster via static variables is one way around this issue. +Alternatively, you could for example write the data to files in a temporary directory with your test sink. -And for example adding to your Flink application an identity mapper operator that will throw an exception -once every `1000ms`. However writing such test could be tricky because of time dependencies between the actions. +* You can also implement your own custom *parallel* source function for emitting watermarks. -Another approach is to write a unit test using the Flink internal testing utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` module. +* It is recommended to always test your pipelines locally with a parallelism > 1 in order to identify bugs which only surface, when the pipelines is executed in parallel. -For an example of how to do that please have a look at the `org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` also in the `flink-streaming-java` module. +* If you use `@Rule` instead of `@ClassRule` a new local Flink cluster will be brought up for each individual test method increasing the overall execution time of your tests. Review comment: ```suggestion * Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same Flink cluster. Doing so saves a significant amount of time since the startup and shutdown of Flink clusters usually dominate the execution time of the actual tests. ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services