sjwiesman commented on a change in pull request #8437: [FLINK-12508] Expand 
Testing Documentation
URL: https://github.com/apache/flink/pull/8437#discussion_r283961380
 
 

 ##########
 File path: docs/dev/stream/testing.md
 ##########
 @@ -181,85 +430,76 @@ public class ExampleIntegrationTest extends 
AbstractTestBase {
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-class ExampleIntegrationTest extends AbstractTestBase {
+class StreamingJobIntegrationTest extends FlatSpec with Matchers with 
BeforeAndAfter {
 
-    @Test
-    def testMultiply(): Unit = {
-        val env = StreamExecutionEnvironment.getExecutionEnvironment
+  val flinkCluster = new MiniClusterWithClientResource(new 
MiniClusterResourceConfiguration.Builder()
+    .setNumberSlotsPerTaskManager(1)
+    .setNumberTaskManagers(1)
+    .build)
 
-        // configure your test environment
-        env.setParallelism(1)
+  before {
+    flinkCluster.before()
+  }
 
-        // values are collected in a static variable
-        CollectSink.values.clear()
+  after {
+    flinkCluster.after()
+  }
 
-        // create a stream of custom elements and apply transformations
-        env
-            .fromElements(1L, 21L, 22L)
-            .map(new MultiplyByTwo())
-            .addSink(new CollectSink())
 
-        // execute
-        env.execute()
+  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
 
-        // verify your results
-        assertEquals(Lists.newArrayList(2L, 42L, 44L), CollectSink.values)
-    }
-}    
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    // configure your test environment
+    env.setParallelism(2)
 
+    // values are collected in a static variable
+    CollectSink.values.clear()
+
+    // create a stream of custom elements and apply transformations
+    env.fromElements(1, 21, 22)
+       .map(new IncrementMapFunction())
+       .addSink(new CollectSink())
+
+    // execute
+    env.execute()
+
+    // verify your results
+    CollectSink.values should contain allOf (1,22,23)
+    }
+}
 // create a testing sink
 class CollectSink extends SinkFunction[Long] {
 
-    override def invoke(value: java.lang.Long): Unit = {
-        synchronized {
-            values.add(value)
-        }
+  override def invoke(value: Long): Unit = {
+    synchronized {
+      CollectSink.values.add(value)
     }
+  }
 }
 
 object CollectSink {
-
     // must be static
-    val values: List[Long] = new ArrayList()
+    val values: util.List[Long] = new util.ArrayList()
 }
 {% endhighlight %}
 </div>
 </div>
 
-The static variable in `CollectSink` is used here because Flink serializes all 
operators before distributing them across a cluster.
-Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
-Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
-You can also implement your own custom sources for emitting watermarks.
-
-## Testing checkpointing and state handling
+A few remarks on integration testing with `MiniClusterWithClientResource`:
 
-One way to test state handling is to enable checkpointing in integration 
tests. 
+* In order not to copy your whole pipeline code from production to test, make 
sources and sinks pluggable in your production code and inject special test 
sources and test sinks in your tests.
 
-You can do that by configuring your `StreamExecutionEnvironment` in the test:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-env.enableCheckpointing(500);
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100));
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-env.enableCheckpointing(500)
-env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100))
-{% endhighlight %}
-</div>
-</div>
+* The static variable in `CollectSink` is used here because Flink serializes 
all operators before distributing them across a cluster.
+Communicating with operators instantiated by a local Flink mini cluster via 
static variables is one way around this issue.
+Alternatively, you could for example write the data to files in a temporary 
directory with your test sink.
 
-And for example adding to your Flink application an identity mapper operator 
that will throw an exception
-once every `1000ms`. However writing such test could be tricky because of time 
dependencies between the actions.
+* You can also implement your own custom *parallel* source function for 
emitting watermarks.
 
-Another approach is to write a unit test using the Flink internal testing 
utility `AbstractStreamOperatorTestHarness` from the `flink-streaming-java` 
module.
+* It is recommended to always test your pipelines locally with a parallelism > 
1 in order to identify bugs which only surface, when the pipelines is executed 
in parallel.
 
-For an example of how to do that please have a look at the 
`org.apache.flink.streaming.runtime.operators.windowing.WindowOperatorTest` 
also in the `flink-streaming-java` module.
+* If you use `@Rule` instead of `@ClassRule` a new local Flink cluster will be 
brought up for each individual test method increasing the overall execution 
time of your tests.
 
 Review comment:
   ```suggestion
   * Prefer `@ClassRule` over `@Rule` so that multiple tests can share the same 
Flink cluster. Doing so saves a significant amount of time since the startup 
and shutdown of Flink clusters usually dominate the execution time of the 
actual tests.
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to