[ https://issues.apache.org/jira/browse/FLINK-17949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17126353#comment-17126353 ]
Yuan Mei commented on FLINK-17949: ---------------------------------- This comment is quoted from [~becket_qin] "Theoretically speaking, using KafkaAdminClient to create a topic does not 100% guarantee that a producer will not see the "does not host this topic-partition" error. This is because when the AdminClient can only guarantee the topic metadata information has existed in the broker to which it sent the CreateTopicRequest. When a producer comes at a later point, it might send TopicMetdataRequest to a different broker and that broker may have not received the updated topic metadata yet. But this is much unlikely to happen given the broker usually receives the metadata update at the same time. Having retries configured on the producer side should be sufficient to handle such cases. We can also do that for 0.10 and 0.11 producers. But given that we have the producer properties scattered over the places (which is something we probably should avoid, to begin with), it would be simpler to just make sure the topic has been created successfully before we start the tests." "If we do not want this happen, we should check the metadata cache of each broker to make sure it has the topic metadata". I think this is one case possibly to cause the situation. But not sure whether it is the same case encountered here (LEADER_NOT_AVAIBLE) 13:34:40,854 [kafka-producer-network-thread | producer-7] WARN org.apache.kafka.clients.NetworkClient [] - [Producer clientId=producer-7] Error while fetching metadata with correlation id 12 : \{test_serde_IngestionTime=LEADER_NOT_AVAILABLE} > KafkaShuffleITCase.testSerDeIngestionTime:156->testRecordSerDe:388 > expected:<310> but was:<0> > --------------------------------------------------------------------------------------------- > > Key: FLINK-17949 > URL: https://issues.apache.org/jira/browse/FLINK-17949 > Project: Flink > Issue Type: Bug > Components: Connectors / Kafka, Tests > Affects Versions: 1.11.0, 1.12.0 > Reporter: Robert Metzger > Priority: Critical > Labels: test-stability > Attachments: logs-ci-kafkagelly-1590500380.zip, > logs-ci-kafkagelly-1590524911.zip > > > https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=2209&view=logs&j=c5f0071e-1851-543e-9a45-9ac140befc32&t=684b1416-4c17-504e-d5ab-97ee44e08a20 > {code} > 2020-05-26T13:35:19.4022562Z [ERROR] > testSerDeIngestionTime(org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase) > Time elapsed: 5.786 s <<< FAILURE! > 2020-05-26T13:35:19.4023185Z java.lang.AssertionError: expected:<310> but > was:<0> > 2020-05-26T13:35:19.4023498Z at org.junit.Assert.fail(Assert.java:88) > 2020-05-26T13:35:19.4023825Z at > org.junit.Assert.failNotEquals(Assert.java:834) > 2020-05-26T13:35:19.4024461Z at > org.junit.Assert.assertEquals(Assert.java:645) > 2020-05-26T13:35:19.4024900Z at > org.junit.Assert.assertEquals(Assert.java:631) > 2020-05-26T13:35:19.4028546Z at > org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testRecordSerDe(KafkaShuffleITCase.java:388) > 2020-05-26T13:35:19.4029629Z at > org.apache.flink.streaming.connectors.kafka.shuffle.KafkaShuffleITCase.testSerDeIngestionTime(KafkaShuffleITCase.java:156) > 2020-05-26T13:35:19.4030253Z at > sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > 2020-05-26T13:35:19.4030673Z at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > 2020-05-26T13:35:19.4031332Z at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > 2020-05-26T13:35:19.4031763Z at > java.lang.reflect.Method.invoke(Method.java:498) > 2020-05-26T13:35:19.4032155Z at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > 2020-05-26T13:35:19.4032630Z at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > 2020-05-26T13:35:19.4033188Z at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > 2020-05-26T13:35:19.4033638Z at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > 2020-05-26T13:35:19.4034103Z at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > 2020-05-26T13:35:19.4034593Z at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298) > 2020-05-26T13:35:19.4035118Z at > org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292) > 2020-05-26T13:35:19.4035570Z at > java.util.concurrent.FutureTask.run(FutureTask.java:266) > 2020-05-26T13:35:19.4035888Z at java.lang.Thread.run(Thread.java:748) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)