Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5870#discussion_r182668925
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
 ---
    @@ -234,43 +232,51 @@ public void testThroughput() throws Exception {
     
                        final int numTaskManagers = parallelism / 
numSlotsPerTaskManager;
     
    -                   final LocalFlinkMiniCluster localFlinkMiniCluster = 
TestBaseUtils.startCluster(
    -                           numTaskManagers,
    -                           numSlotsPerTaskManager,
    -                           false,
    -                           false,
    -                           true);
    +                   final MiniClusterResource cluster = new 
MiniClusterResource(
    +                           new 
MiniClusterResource.MiniClusterResourceConfiguration(
    +                                   new Configuration(),
    +                                   numTaskManagers,
    +                                   numSlotsPerTaskManager
    +                           ),
    +                           true
    +                   );
    +                   cluster.before();
     
                        try {
    -                           System.out.println(Arrays.toString(p));
    +                           System.out.println(String.format("Running test 
with parameters: dataVolumeGB=%s, useForwarder=%s, isSlowSender=%s, 
isSlowReceiver=%s, parallelism=%s, numSlotsPerTM=%s",
    +                                   dataVolumeGb, useForwarder, 
isSlowSender, isSlowReceiver, parallelism, numSlotsPerTaskManager));
                                testProgram(
    -                                   localFlinkMiniCluster,
    +                                   cluster,
                                        dataVolumeGb,
                                        useForwarder,
                                        isSlowSender,
                                        isSlowReceiver,
                                        parallelism);
                        } finally {
    -                           
TestBaseUtils.stopCluster(localFlinkMiniCluster, 
FutureUtils.toFiniteDuration(TestingUtils.TIMEOUT()));
    +                           cluster.after();
                        }
                }
        }
     
        private void testProgram(
    -                   LocalFlinkMiniCluster localFlinkMiniCluster,
    +                   final MiniClusterResource cluster,
                        final int dataVolumeGb,
                        final boolean useForwarder,
                        final boolean isSlowSender,
                        final boolean isSlowReceiver,
                        final int parallelism) throws Exception {
    -           JobExecutionResult jer = localFlinkMiniCluster.submitJobAndWait(
    +           ClusterClient<?> client = cluster.getClusterClient();
    +           client.setDetached(false);
    +           client.setPrintStatusDuringExecution(false);
    +
    +           JobExecutionResult jer = (JobExecutionResult) client.submitJob(
                        createJobGraph(
                                dataVolumeGb,
                                useForwarder,
                                isSlowSender,
                                isSlowReceiver,
                                parallelism),
    -                   false);
    +                   NetworkStackThroughputITCase.class.getClassLoader());
    --- End diff --
    
    Since this method is non-static, why not use 
`this.getClass().getClassLoader()`?


---

Reply via email to