Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5669#discussion_r177747320 --- Diff: flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java --- @@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws Exception { final AtomicReference<Throwable> error = new AtomicReference<>(); + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); --- End diff -- Please unify and extract common code between `runCancelingOnFullInputTest` and `runCancelingOnEmptyInputTest` for example into: ``` private void runCancelingTest(boolean emptyInput) throws Exception { final String topic = emptyInput ? "cancelingOnEmptyInputTopic" : "cancelingOnFullTopic"; final int parallelism = 3; createTestTopic(topic, parallelism, 1); // launch a producer thread DataGenerators.InfiniteStringsGenerator generator = new DataGenerators.InfiniteStringsGenerator(kafkaServer, topic); if (!emptyInput) { generator.start(); } // launch a consumer asynchronously final AtomicReference<Throwable> jobError = new AtomicReference<>(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(parallelism); env.enableCheckpointing(100); env.getConfig().disableSysoutLogging(); Properties props = new Properties(); props.putAll(standardProps); props.putAll(secureProps); FlinkKafkaConsumerBase<String> source = kafkaServer.getConsumer(topic, new SimpleStringSchema(), props); env.addSource(source).addSink(new DiscardingSink<String>()); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); final JobID jobId = jobGraph.getJobID(); final Runnable jobRunner = new Runnable() { @Override public void run() { try { client.setDetached(false); client.submitJob(jobGraph, KafkaConsumerTestBase.class.getClassLoader()); } catch (Throwable t) { jobError.set(t); } } }; Thread runnerThread = new Thread(jobRunner, "program runner thread"); runnerThread.start(); // wait a bit before canceling Thread.sleep(2000); Throwable failueCause = jobError.get(); if (failueCause != null) { failueCause.printStackTrace(); Assert.fail("Test failed prematurely with: " + failueCause.getMessage()); } // cancel client.cancel(jobId); // wait for the program to be done and validate that we failed with the right exception runnerThread.join(); assertEquals(JobStatus.CANCELED, client.getJobStatus(jobId).get()); if (generator.isAlive()) { generator.shutdown(); generator.join(); } else if (!emptyInput) { Throwable t = generator.getError(); if (t != null) { t.printStackTrace(); fail("Generator failed: " + t.getMessage()); } else { fail("Generator failed with no exception"); } } deleteTestTopic(topic); } ```
---