Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5669#discussion_r177747320
  
    --- Diff: 
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
 ---
    @@ -1063,23 +1078,27 @@ public void runCancelingOnEmptyInputTest() throws 
Exception {
     
                final AtomicReference<Throwable> error = new 
AtomicReference<>();
     
    +           final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    --- End diff --
    
    Please unify and extract common code between `runCancelingOnFullInputTest` 
and `runCancelingOnEmptyInputTest` for example into:
    
    ```
        private void runCancelingTest(boolean emptyInput) throws Exception {
                final String topic = emptyInput ? "cancelingOnEmptyInputTopic" 
: "cancelingOnFullTopic";
    
                final int parallelism = 3;
                createTestTopic(topic, parallelism, 1);
    
                // launch a producer thread
                DataGenerators.InfiniteStringsGenerator generator =
                        new 
DataGenerators.InfiniteStringsGenerator(kafkaServer, topic);
                if (!emptyInput) {
                        generator.start();
                }
    
                // launch a consumer asynchronously
    
                final AtomicReference<Throwable> jobError = new 
AtomicReference<>();
    
                final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(parallelism);
                env.enableCheckpointing(100);
                env.getConfig().disableSysoutLogging();
    
                Properties props = new Properties();
                props.putAll(standardProps);
                props.putAll(secureProps);
                FlinkKafkaConsumerBase<String> source = 
kafkaServer.getConsumer(topic, new SimpleStringSchema(), props);
    
                env.addSource(source).addSink(new DiscardingSink<String>());
    
                JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
                final JobID jobId = jobGraph.getJobID();
    
                final Runnable jobRunner = new Runnable() {
                        @Override
                        public void run() {
                                try {
                                        client.setDetached(false);
                                        client.submitJob(jobGraph, 
KafkaConsumerTestBase.class.getClassLoader());
                                }
                                catch (Throwable t) {
                                        jobError.set(t);
                                }
                        }
                };
    
                Thread runnerThread = new Thread(jobRunner, "program runner 
thread");
                runnerThread.start();
    
                // wait a bit before canceling
                Thread.sleep(2000);
    
                Throwable failueCause = jobError.get();
                if (failueCause != null) {
                        failueCause.printStackTrace();
                        Assert.fail("Test failed prematurely with: " + 
failueCause.getMessage());
                }
    
                // cancel
                client.cancel(jobId);
    
                // wait for the program to be done and validate that we failed 
with the right exception
                runnerThread.join();
    
                assertEquals(JobStatus.CANCELED, 
client.getJobStatus(jobId).get());
    
                if (generator.isAlive()) {
                        generator.shutdown();
                        generator.join();
                }
                else if (!emptyInput) {
                        Throwable t = generator.getError();
                        if (t != null) {
                                t.printStackTrace();
                                fail("Generator failed: " + t.getMessage());
                        } else {
                                fail("Generator failed with no exception");
                        }
                }
    
                deleteTestTopic(topic);
        }
    ```
    



---

Reply via email to