[ 
https://issues.apache.org/jira/browse/KAFKA-15020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17730271#comment-17730271
 ] 

Josep Prat commented on KAFKA-15020:
------------------------------------

Seen this test failing with a different error: 
https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-13824/1/testReport/junit/integration.kafka.server/FetchFromFollowerIntegrationTest/Build___JDK_8_and_Scala_2_12___testRackAwareRangeAssignor__/

 
h3. Error Message
{code:java}
java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
Timed out while awaiting expected assignment 
Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The 
current assignment is []{code}
h3. Stacktrace
{code:java}
java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
Timed out while awaiting expected assignment 
Set(topicWithAllPartitionsOnAllRacks-1, topicWithSingleRackPartitions-1). The 
current assignment is [] at 
java.util.concurrent.FutureTask.report(FutureTask.java:122) at 
java.util.concurrent.FutureTask.get(FutureTask.java:206) at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9(FetchFromFollowerIntegrationTest.scala:211)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$9$adapted(FetchFromFollowerIntegrationTest.scala:211)
 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at 
integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:211)
 at 
integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:251)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:727)
 at 
org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
 at 
org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
 at 
org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
 at 
org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$7(TestMethodTestDescriptor.java:217)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
 at 
org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:151)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at java.util.ArrayList.forEach(ArrayList.java:1259) at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at java.util.ArrayList.forEach(ArrayList.java:1259) at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
 at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137) 
at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
 at 
org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
 at 
org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
 at 
org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
 at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
 at 
org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
 at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
 at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
 at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
 at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
 at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:110)
 at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:90)
 at 
org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:85)
 at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:62)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
 at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
 at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
 at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
 at com.sun.proxy.$Proxy2.stop(Unknown Source) at 
org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:193)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:129)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:100)
 at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:60)
 at 
org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:113)
 at 
org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:65)
 at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
 at 
worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
 Caused by: org.opentest4j.AssertionFailedError: Timed out while awaiting 
expected assignment Set(topicWithAllPartitionsOnAllRacks-1, 
topicWithSingleRackPartitions-1). The current assignment is [] at 
org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38) at 
org.junit.jupiter.api.Assertions.fail(Assertions.java:135) at 
integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$6(FetchFromFollowerIntegrationTest.scala:207)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at 
java.util.concurrent.FutureTask.run(FutureTask.java:266) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:750) {code}
h3. Standard Output
{code:java}
[2023-06-07 14:38:03,696] WARN maxCnxns is not configured, using default value 
0. (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 
14:38:04,604] WARN No meta.properties file under dir 
/tmp/kafka-530797675346657980/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:05,839] WARN No 
meta.properties file under dir /tmp/kafka-1673553658854133121/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:10,275] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:10,279] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:10,280] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:11,297] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-0. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:11,298] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-4. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:11,298] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-2. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:12,307] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:12,308] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:12,309] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:13,520] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error 
in response for fetch request (type=FetchRequest, replicaId=0, maxWait=500, 
minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-0=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-4=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-2=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:14,527] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Connection to node 1 (localhost/127.0.0.1:35917) could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient:814) [2023-06-07 14:38:14,528] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
maxBytes=10485760, 
fetchData={__consumer_offsets-0=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-4=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-2=PartitionData(topicId=6kR7xL_iRu2oKZ-Q6JDwOQ, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 
localhost:35917 (id: 1 rack: null) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:15,812] WARN [RequestSendThread controllerId=0] Controller 0 
epoch 1 fails to send request (type=LeaderAndIsRequest, controllerId=0, 
controllerEpoch=1, brokerEpoch=26, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=2, controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], 
partitionEpoch=1, replicas=[1, 0], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='test-fetch-from-follower', 
partitionIndex=0, controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], 
partitionEpoch=1, replicas=[0, 1], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=4, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=1, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=3, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=0, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0)], topicIds={__consumer_offsets=6kR7xL_iRu2oKZ-Q6JDwOQ, 
test-fetch-from-follower=BsmTr1n0TWSc8G-fH5EVQg}, liveLeaders=(localhost:39503 
(id: 0 rack: 0))) to broker localhost:39503 (id: 0 rack: 0). Reconnecting to 
broker. (kafka.controller.RequestSendThread:72) java.io.IOException: Connection 
to 0 was disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:256) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:15,915] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:15,918] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,020] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,021] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,123] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,123] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,226] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,226] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,330] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,331] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,433] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,434] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,536] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,537] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,639] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,639] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,741] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,742] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,844] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,845] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:16,947] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:16,948] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,050] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,050] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,153] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,153] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,255] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,256] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,358] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,358] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,461] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,462] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,567] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:39503) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:38:17,568] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:39503 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:39503 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,571] WARN [RequestSendThread controllerId=0] Controller 0 
epoch 1 fails to send request (type=LeaderAndIsRequest, controllerId=0, 
controllerEpoch=1, brokerEpoch=26, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=2, controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], 
partitionEpoch=1, replicas=[1, 0], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='test-fetch-from-follower', 
partitionIndex=0, controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], 
partitionEpoch=1, replicas=[0, 1], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=4, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=1, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=3, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=0, 
controllerEpoch=1, leader=0, leaderEpoch=1, isr=[0], partitionEpoch=1, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0)], topicIds={__consumer_offsets=6kR7xL_iRu2oKZ-Q6JDwOQ, 
test-fetch-from-follower=BsmTr1n0TWSc8G-fH5EVQg}, liveLeaders=(localhost:39503 
(id: 0 rack: 0))) to broker localhost:39503 (id: 0 rack: 0). Reconnecting to 
broker. (kafka.controller.RequestSendThread:72) java.lang.InterruptedException 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at 
org.apache.kafka.server.util.ShutdownableThread.pause(ShutdownableThread.java:113)
 at 
kafka.controller.RequestSendThread.backoff$1(ControllerChannelManager.scala:237)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:251) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:17,898] WARN maxCnxns is not configured, using default value 
0. (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 
14:38:17,971] WARN No meta.properties file under dir 
/tmp/kafka-5157800393705216363/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:18,239] WARN No 
meta.properties file under dir /tmp/kafka-7997567040634983423/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:18,887] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:18,888] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:18,889] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:20,320] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error 
in response for fetch request (type=FetchRequest, replicaId=1, maxWait=500, 
minBytes=1, maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, 
removed=, replaced=, metadata=(sessionId=855297361, epoch=3), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 0 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:20,320] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-0=PartitionData(topicId=OGtNO5GYTo6CrKC13lP2LA, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-4=PartitionData(topicId=OGtNO5GYTo6CrKC13lP2LA, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-2=PartitionData(topicId=OGtNO5GYTo6CrKC13lP2LA, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=INVALID, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:27,718] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-1=PartitionData(topicId=44Xg_n3QTjqwDHTaz5D6Lg, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=602568718, epoch=3), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:27,722] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=1982843598, epoch=3), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 0 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:29,995] ERROR [QuorumController id=1000] writeNoOpRecord: 
unable to start processing because of RejectedExecutionException. Reason: The 
event queue is shutting down (org.apache.kafka.controller.QuorumController:467) 
[2023-06-07 14:38:29,996] ERROR [QuorumController id=1000] maybeFenceReplicas: 
unable to start processing because of RejectedExecutionException. Reason: The 
event queue is shutting down (org.apache.kafka.controller.QuorumController:467) 
[2023-06-07 14:38:29,996] ERROR [QuorumController id=1000] Cancelling deferred 
write event maybeFenceReplicas because the event queue is now closed. 
(org.apache.kafka.controller.QuorumController:1352) [2023-06-07 14:38:30,024] 
WARN maxCnxns is not configured, using default value 0. 
(org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 14:38:30,095] 
WARN No meta.properties file under dir 
/tmp/kafka-6398712900983851383/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:30,372] WARN No 
meta.properties file under dir /tmp/kafka-8068580669768440457/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:31,205] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:31,205] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:31,205] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:50,907] WARN [Consumer instanceId=instance-0, 
clientId=consumer-group-instance-0, groupId=group] Close timed out with 1 
pending requests to coordinator, terminating client connections 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1110) 
[2023-06-07 14:38:50,916] WARN [Consumer instanceId=instance-1, 
clientId=consumer-group-instance-1, groupId=group] Close timed out with 1 
pending requests to coordinator, terminating client connections 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1110) 
[2023-06-07 14:38:50,926] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=1937028528, epoch=39), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 0 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:50,926] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-2=PartitionData(topicId=YPVBwUzvS3OokWAXxd9INg, 
fetchOffset=14, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=1358953892, epoch=45), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:54,192] WARN maxCnxns is not configured, using default value 
0. (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 
14:38:54,282] WARN No meta.properties file under dir 
/tmp/kafka-462760663668296480/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:54,582] WARN No 
meta.properties file under dir /tmp/kafka-1088631316499234254/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 14:38:55,369] WARN 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:55,371] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:55,371] WARN 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
14:38:55,942] WARN [ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition 
test-fetch-from-follower-0. This error may be returned transiently when the 
partition is being created or deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 14:38:57,544] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
maxBytes=10485760, fetchData={}, isolationLevel=READ_UNCOMMITTED, removed=, 
replaced=, metadata=(sessionId=272377002, epoch=3), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:58,547] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Connection to node 1 (localhost/127.0.0.1:34497) could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient:814) [2023-06-07 14:38:58,547] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
maxBytes=10485760, 
fetchData={__consumer_offsets-3=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-1=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=272377002, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 
localhost:34497 (id: 1 rack: null) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:38:59,549] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Connection to node 1 (localhost/127.0.0.1:34497) could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient:814) [2023-06-07 14:38:59,550] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
maxBytes=10485760, 
fetchData={__consumer_offsets-3=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-1=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=272377002, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 
localhost:34497 (id: 1 rack: null) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:39:00,552] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Connection to node 1 (localhost/127.0.0.1:34497) could not be 
established. Broker may not be available. 
(org.apache.kafka.clients.NetworkClient:814) [2023-06-07 14:39:00,553] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=0, maxWait=500, minBytes=1, 
maxBytes=10485760, 
fetchData={__consumer_offsets-3=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty), 
__consumer_offsets-1=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=0, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional.empty)}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=272377002, epoch=INITIAL), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 
localhost:34497 (id: 1 rack: null) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:108) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:39:00,709] WARN [Producer clientId=producer-6] Connection to 
node -2 (localhost/127.0.0.1:34497) could not be established. Broker may not be 
available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:39:00,710] WARN [Producer clientId=producer-6] Bootstrap broker 
localhost:34497 (id: -2 rack: null) disconnected 
(org.apache.kafka.clients.NetworkClient:1105) [2023-06-07 14:39:04,068] WARN 
[ReplicaFetcher replicaId=1, leaderId=0, fetcherId=0] Error in response for 
fetch request (type=FetchRequest, replicaId=1, maxWait=500, minBytes=1, 
maxBytes=10485760, 
fetchData={__consumer_offsets-2=PartitionData(topicId=JIgFQQAOSN-2W9ELl65S8w, 
fetchOffset=7, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[1], lastFetchedEpoch=Optional[1])}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=1159526741, epoch=6), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 0 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:39:05,605] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:42967) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:39:05,605] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:42967 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:42967 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:39:05,708] WARN [Controller id=0, targetBrokerId=0] Connection 
to node 0 (localhost/127.0.0.1:42967) could not be established. Broker may not 
be available. (org.apache.kafka.clients.NetworkClient:814) [2023-06-07 
14:39:05,709] WARN [RequestSendThread controllerId=0] Controller 0's connection 
to broker localhost:42967 (id: 0 rack: 0) was unsuccessful 
(kafka.controller.RequestSendThread:72) java.io.IOException: Connection to 
localhost:42967 (id: 0 rack: 0) failed. at 
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:70)
 at 
kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:296)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:249) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 14:39:05,781] WARN [RequestSendThread controllerId=0] Controller 0 
epoch 1 fails to send request (type=LeaderAndIsRequest, controllerId=0, 
controllerEpoch=1, brokerEpoch=26, 
partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', 
partitionIndex=2, controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], 
partitionEpoch=3, replicas=[0, 1], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='test-fetch-from-follower', 
partitionIndex=0, controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], 
partitionEpoch=3, replicas=[0, 1], addingReplicas=[], removingReplicas=[], 
isNew=false, leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=4, 
controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], partitionEpoch=3, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=1, 
controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], partitionEpoch=3, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=3, 
controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], partitionEpoch=3, 
replicas=[1, 0], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0), 
LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=0, 
controllerEpoch=1, leader=0, leaderEpoch=2, isr=[0], partitionEpoch=3, 
replicas=[0, 1], addingReplicas=[], removingReplicas=[], isNew=false, 
leaderRecoveryState=0)], topicIds={__consumer_offsets=JIgFQQAOSN-2W9ELl65S8w, 
test-fetch-from-follower=5ejb9DVBTpelQ6JJ7tpWQA}, liveLeaders=(localhost:42967 
(id: 0 rack: 0))) to broker localhost:42967 (id: 0 rack: 0). Reconnecting to 
broker. (kafka.controller.RequestSendThread:72) java.lang.InterruptedException 
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1039)
 at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1328)
 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:277) at 
org.apache.kafka.server.util.ShutdownableThread.pause(ShutdownableThread.java:113)
 at 
kafka.controller.RequestSendThread.backoff$1(ControllerChannelManager.scala:237)
 at 
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:251) 
at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 16:34:43,707] WARN maxCnxns is not configured, using default value 
0. (org.apache.zookeeper.server.ServerCnxnFactory:309) [2023-06-07 
16:34:44,724] WARN No meta.properties file under dir 
/tmp/kafka-352405376944625096/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 16:34:46,171] WARN No 
meta.properties file under dir /tmp/kafka-5277750745167705872/meta.properties 
(kafka.server.BrokerMetadataCheckpoint:70) [2023-06-07 16:34:47,051] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-2. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
16:34:47,054] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition __consumer_offsets-4. 
This error may be returned transiently when the partition is being created or 
deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 16:34:47,055] WARN 
[ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Received UNKNOWN_TOPIC_ID 
from the leader for partition __consumer_offsets-0. This error may be returned 
transiently when the partition is being created or deleted, but it is not 
expected to persist. (kafka.server.ReplicaFetcherThread:70) [2023-06-07 
16:34:47,351] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] 
Received UNKNOWN_TOPIC_ID from the leader for partition 
topicWithAllPartitionsOnAllRacks-1. This error may be returned transiently when 
the partition is being created or deleted, but it is not expected to persist. 
(kafka.server.ReplicaFetcherThread:70) [2023-06-07 16:34:52,679] WARN [Producer 
clientId=producer-1] Got error produce response with correlation id 15 on 
topic-partition topicWithSingleRackPartitions-1, retrying (2147483646 attempts 
left). Error: NOT_LEADER_OR_FOLLOWER 
(org.apache.kafka.clients.producer.internals.Sender:650) [2023-06-07 
16:34:52,680] WARN [Producer clientId=producer-1] Received invalid metadata 
error in produce request on partition topicWithSingleRackPartitions-1 due to 
org.apache.kafka.common.errors.NotLeaderOrFollowerException: For requests 
intended only for the leader, this error indicates that the broker is not the 
current leader. For requests intended for any replica, this error indicates 
that the broker is not a replica of the topic partition.. Going to request 
metadata update now (org.apache.kafka.clients.producer.internals.Sender:676) 
[2023-06-07 16:34:52,681] WARN [Producer clientId=producer-1] Got error produce 
response with correlation id 16 on topic-partition 
topicWithSingleRackPartitions-0, retrying (2147483646 attempts left). Error: 
NOT_LEADER_OR_FOLLOWER (org.apache.kafka.clients.producer.internals.Sender:650) 
[2023-06-07 16:34:52,681] WARN [Producer clientId=producer-1] Received invalid 
metadata error in produce request on partition topicWithSingleRackPartitions-0 
due to org.apache.kafka.common.errors.NotLeaderOrFollowerException: For 
requests intended only for the leader, this error indicates that the broker is 
not the current leader. For requests intended for any replica, this error 
indicates that the broker is not a replica of the topic partition.. Going to 
request metadata update now 
(org.apache.kafka.clients.producer.internals.Sender:676) [2023-06-07 
16:34:53,311] WARN [Consumer instanceId=instance-0, 
clientId=consumer-group-instance-0, groupId=group] Close timed out with 1 
pending requests to coordinator, terminating client connections 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1110) 
[2023-06-07 16:34:53,322] WARN [Consumer instanceId=instance-1, 
clientId=consumer-group-instance-1, groupId=group] Close timed out with 1 
pending requests to coordinator, terminating client connections 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1110) 
[2023-06-07 16:34:53,387] WARN [ReplicaFetcher replicaId=0, leaderId=1, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, 
fetchData={__consumer_offsets-2=PartitionData(topicId=qTggEuKKTyiBNhNNXKivNA, 
fetchOffset=16, logStartOffset=0, maxBytes=1048576, 
currentLeaderEpoch=Optional[0], lastFetchedEpoch=Optional[0])}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=355194747, epoch=20), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 1 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 [2023-06-07 16:34:53,387] WARN [ReplicaFetcher replicaId=1, leaderId=0, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=1, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={}, 
isolationLevel=READ_UNCOMMITTED, removed=, replaced=, 
metadata=(sessionId=396797168, epoch=15), rackId=) 
(kafka.server.ReplicaFetcherThread:72) java.io.IOException: Connection to 0 was 
disconnected before the response was read at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
 at 
kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113) 
at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79) at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
 at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
 at scala.Option.foreach(Option.scala:407) at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129) 
at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112) 
at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98) at 
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)
 {code}

> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor
>  test is flaky
> --------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15020
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15020
>             Project: Kafka
>          Issue Type: Test
>            Reporter: Atul Sharma
>            Priority: Major
>
> Sometimes the test fails with the following log:
> {code:java}
> Gradle Test Run :core:integrationTest > Gradle Test Executor 175 > 
> FetchFromFollowerIntegrationTest > testRackAwareRangeAssignor() FAILED
> org.opentest4j.AssertionFailedError: Consumed 0 records before timeout 
> instead of the expected 2 records
> at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:38)
> at org.junit.jupiter.api.Assertions.fail(Assertions.java:135)
> at kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1087)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11(FetchFromFollowerIntegrationTest.scala:216)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.$anonfun$testRackAwareRangeAssignor$11$adapted(FetchFromFollowerIntegrationTest.scala:215)
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
> at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
> at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.verifyAssignments$1(FetchFromFollowerIntegrationTest.scala:215)
>  at 
> integration.kafka.server.FetchFromFollowerIntegrationTest.testRackAwareRangeAssignor(FetchFromFollowerIntegrationTest.scala:244)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to