[ 
https://issues.apache.org/jira/browse/FLINK-17949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17135738#comment-17135738
 ] 

Yuan Mei commented on FLINK-17949:
----------------------------------

I've tried to set the broker number to 1. This is what I've found:

 
 * If not taking wrong, the number of brokers is decided by 
`_NUMBER_OF_KAFKA_SERVERS`._ Hence I've tried to adjust this number.
 * If _NUMBER_OF_KAFKA_SERVERS >=3_, it works as normal
 * If _NUMBER_OF_KAFKA_SERVERS < 3_, which means 1 or 2, tests are timed out. 
More specifically, the tests I've used is 
KafkaShuffleITCase#testSimpleProcessingTime
 * So I started the debug mode and found the following errors are repeated many 
many times until timing out.

 
{code:java}
68630 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
[Consumer clientId=consumer-2, groupId=flink-tests] Group coordinator lookup 
failed: The coordinator is not available.
68630 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
[Consumer clientId=consumer-2, groupId=flink-tests] Coordinator discovery 
failed, refreshing metadata
68733 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.NetworkClient [] - [Consumer 
clientId=consumer-2, groupId=flink-tests] Sending metadata request 
(type=MetadataRequest, topics=test_simple_ProcessingTime, allowAutoCreate=true) 
to node localhost:58359 (id: 0 rack: null)
68734 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.Metadata [] - Updating last seen epoch from 0 to 
0 for partition test_simple_ProcessingTime-0
68734 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.Metadata [] - Updated cluster metadata version 
576 to MetadataCache{cluster=Cluster(id = D4iOgpT5RbeWldEduDKMUQ, nodes = 
[localhost:58359 (id: 0 rack: null)], partitions = [Partition(topic = 
test_simple_ProcessingTime, partition = 0, leader = 0, replicas = [0], isr = 
[0], offlineReplicas = [])], controller = localhost:58359 (id: 0 rack: null))}
68734 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
[Consumer clientId=consumer-2, groupId=flink-tests] Sending FindCoordinator 
request to broker localhost:58359 (id: 0 rack: null)
68735 [Kafka Shuffle Fetcher for Source: Custom Source -> Map -> Map (1/1)] 
DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - 
[Consumer clientId=consumer-2, groupId=flink-tests] Received FindCoordinator 
response ClientResponse(receivedTimeMs=1592216879465, latencyMs=1, 
disconnected=false, requestHeader=RequestHeader(apiKey=FIND_COORDINATOR, 
apiVersion=2, clientId=consumer-2, correlationId=1151), 
responseBody=FindCoordinatorResponse(throttleTimeMs=0, errorMessage='null', 
error=COORDINATOR_NOT_AVAILABLE, node=:-1 (id: -1 rack: null))){code}
 

 

If someone happens to know what is the relationship between group coordinator 
and broker numbers, I would really appreciate and saved me a lot of time.

 

Also, I was wondering whether setting the number of brokers to 1 would cause 
some other problems (like losing the high-availability guarantee in this case?

> KafkaShuffleITCase.testSerDeIngestionTime:156->testRecordSerDe:388 
> expected:<310> but was:<0>
> ---------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17949
>                 URL: https://issues.apache.org/jira/browse/FLINK-17949
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / Kafka, Tests
>    Affects Versions: 1.11.0, 1.12.0
>            Reporter: Robert Metzger
>            Priority: Critical
>              Labels: test-stability
>         Attachments: logs-ci-kafkagelly-1590500380.zip, 
> logs-ci-kafkagelly-1590524911.zip
>
>
> https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2209&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20
> {code}
> 2020-05-26T13:35:19.4022562Z [ERROR] 
> testSerDeIngestionTime(org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase)
>   Time elapsed: 5.786 s  <<< FAILURE!
> 2020-05-26T13:35:19.4023185Z java.lang.AssertionError: expected:<310> but 
> was:<0>
> 2020-05-26T13:35:19.4023498Z  at org.junit.Assert.fail(Assert.java:88)
> 2020-05-26T13:35:19.4023825Z  at 
> org.junit.Assert.failNotEquals(Assert.java:834)
> 2020-05-26T13:35:19.4024461Z  at 
> org.junit.Assert.assertEquals(Assert.java:645)
> 2020-05-26T13:35:19.4024900Z  at 
> org.junit.Assert.assertEquals(Assert.java:631)
> 2020-05-26T13:35:19.4028546Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testRecordSerDe(KafkaShuffleITCase.java:388)
> 2020-05-26T13:35:19.4029629Z  at 
> org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testSerDeIngestionTime(KafkaShuffleITCase.java:156)
> 2020-05-26T13:35:19.4030253Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 2020-05-26T13:35:19.4030673Z  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 2020-05-26T13:35:19.4031332Z  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 2020-05-26T13:35:19.4031763Z  at 
> java.lang.reflect.Method.invoke(Method.java:498)
> 2020-05-26T13:35:19.4032155Z  at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
> 2020-05-26T13:35:19.4032630Z  at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 2020-05-26T13:35:19.4033188Z  at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
> 2020-05-26T13:35:19.4033638Z  at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> 2020-05-26T13:35:19.4034103Z  at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
> 2020-05-26T13:35:19.4034593Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
> 2020-05-26T13:35:19.4035118Z  at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
> 2020-05-26T13:35:19.4035570Z  at 
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 2020-05-26T13:35:19.4035888Z  at java.lang.Thread.run(Thread.java:748)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to