[ 
https://issues.apache.org/jira/browse/FLINK-10603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16666370#comment-16666370
 ] 

vinoyang commented on FLINK-10603:
----------------------------------

FlinkKafkaProducer011ITCase#testScaleUpAfterScalingDown, In my local test, this 
method always takes about 55s to 1min. I added some time to measure in the 
method (see comment): 
{code:java}
public void testScaleUpAfterScalingDown() throws Exception {
   long start = System.currentTimeMillis();
   String topic = "scale-down-before-first-checkpoint";

   final int parallelism1 = 4;
   final int parallelism2 = 2;
   final int parallelism3 = 3;
   final int maxParallelism = Math.max(parallelism1, Math.max(parallelism2, 
parallelism3));

   long step1 = System.currentTimeMillis();
   System.out.println("step1 : " + (step1 - start));    //0

   List<OperatorStateHandle> operatorSubtaskState = repartitionAndExecute(
      topic,
      Collections.emptyList(),
      parallelism1,
      maxParallelism,
      IntStream.range(0, parallelism1).boxed().iterator());

   long step2 = System.currentTimeMillis();
   System.out.println("step2 : " + (step2 - step1));    //32490

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism2,
      maxParallelism,
      IntStream.range(parallelism1,  parallelism1 + 
parallelism2).boxed().iterator());

   long step3 = System.currentTimeMillis();
   System.out.println("step3 : " + (step3 - step2));    //8939

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      parallelism3,
      maxParallelism,
      IntStream.range(parallelism1 + parallelism2,  parallelism1 + parallelism2 
+ parallelism3).boxed().iterator());

   long step4 = System.currentTimeMillis();
   System.out.println("step4 : " + (step4 - step3));    //6495

   // After each previous repartitionAndExecute call, we are left with some 
lingering transactions, that would
   // not allow us to read all committed messages from the topic. Thus we 
initialize operators from
   // OperatorSubtaskState once more, but without any new data. This should 
terminate all ongoing transactions.

   operatorSubtaskState = repartitionAndExecute(
      topic,
      operatorSubtaskState,
      1,
      maxParallelism,
      Collections.emptyIterator());

   long step5 = System.currentTimeMillis();
   System.out.println("step5 : " + (step5 - step4));    //5394

   assertExactlyOnceForTopic(
      createProperties(),
      topic,
      0,
      IntStream.range(0, parallelism1 + parallelism2 + 
parallelism3).boxed().collect(Collectors.toList()),
      10_000L);

   long step6 = System.currentTimeMillis();
   System.out.println("step6 : " + (step6 - step5));    //2039

   deleteTestTopic(topic);

   long end = System.currentTimeMillis();
   System.out.println("step7 : " + (end - step6));    //27
   System.out.println("total : " + (end - start));    //55384
}

FlinkKafkaProducer011ITCase's total test time is about 5 minutes.
{code}
 

cc [~pnowojski] [~Zentol] [~aljoscha]

> Reduce kafka test duration
> --------------------------
>
>                 Key: FLINK-10603
>                 URL: https://issues.apache.org/jira/browse/FLINK-10603
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kafka Connector, Tests
>    Affects Versions: 1.7.0
>            Reporter: Chesnay Schepler
>            Assignee: vinoyang
>            Priority: Major
>              Labels: pull-request-available
>
> The tests for the modern kafka connector take more than 10 minutes which is 
> simply unacceptable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to