Vijay created KAFKA-9250:
----------------------------
Summary: Kafka streams stuck in rebalancing state after
UnknownHostException
Key: KAFKA-9250
URL: https://issues.apache.org/jira/browse/KAFKA-9250
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.2.0
Reporter: Vijay
We are using kafka streams (2.2.0) application for reading messages from source
topic do some transformation and send to destination topic. Application started
fine and processed messages till it encountered UnknownHostException, after
which application is hung in rebalancing state and not processing messages.
Below are the properties we have configured :
application.id = *****
bootstrap.servers = "hostname1:port1,hostname2:port2,hostname3:port3"
num.stream.threads=3
replication.factor=3
num.standby.replicas=1
max.block.ms=86400000
acks=all
auto.offset.reset=earliest
processing.guarantee=exactly_once
Additional details.
Number of brokers - 3
Source topic partition count - 12 and replication factor of 3
Destination topic partition count - 12 and replication factor of 3
4 instances of stream application are deployed in docker containers.
Below are the some of the logs :
[WARN]
[streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1][WARN]
[streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1]
o.a.k.clients.NetworkClient - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-1-consumer,
groupId=streams-example] Error connecting to node hostname1:port1 (id:
2147438464 rack: null)java.net.UnknownHostException: hostname1 at
java.net.InetAddress.getAllByName0(InetAddress.java:1280) at
java.net.InetAddress.getAllByName(InetAddress.java:1192) at
java.net.InetAddress.getAllByName(InetAddress.java:1126) at
org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:117) at
org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.moveToNextAddress(ClusterConnectionStates.java:387)
at
org.apache.kafka.clients.ClusterConnectionStates.connecting(ClusterConnectionStates.java:121)
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:917)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.tryConnect(ConsumerNetworkClient.java:548)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:676)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:656)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:575)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:235)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:317)
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:850)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
[ERROR] [kafka-producer-network-thread |
streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer][ERROR]
[kafka-producer-network-thread |
streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer]
o.a.k.c.p.internals.Sender - [Producer
clientId=streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1-0_8-producer,
transactionalId=streams-example-0_8] Uncaught error in kafka producer I/O
thread: java.lang.IllegalStateException: No entry found for connection 2 at
org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:339)
at
org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:143)
at
org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:926)
at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:287) at
org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:65)
at
org.apache.kafka.clients.producer.internals.Sender.maybeSendTransactionalRequest(Sender.java:421)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:286) at
org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:235) at
java.lang.Thread.run(Thread.java:748)
[WARN]
[streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1][WARN]
[streams-example-e3628ed4-d2f5-44c2-a9cd-31de847589f6-StreamThread-1]
o.a.kafka.clients.ClientUtils - Couldn't resolve server hostname1:port1 from
bootstrap.servers as DNS resolution failed for hostname1
[INFO] [kafka-coordinator-heartbeat-thread | streams-example][INFO]
[kafka-coordinator-heartbeat-thread | streams-example]
o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer,
groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646
rack: null) is unavailable or invalid, will attempt rediscovery[INFO]
[kafka-coordinator-heartbeat-thread | streams-example]
o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer,
groupId=streams-example] Discovered group coordinator hostname1:port1 (id:
2147438646 rack: null)[INFO] [kafka-coordinator-heartbeat-thread |
streams-example] o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer,
groupId=streams-example] Group coordinator hostname1:port1 (id: 2147438646
rack: null) is unavailable or invalid, will attempt rediscovery[INFO]
[kafka-coordinator-heartbeat-thread | streams-example]
o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-3-consumer,
groupId=streams-example] Discovered group coordinator hostname1:port1 (id:
2147438646 rack: null)
[INFO] [kafka-coordinator-heartbeat-thread | streams-example][INFO]
[kafka-coordinator-heartbeat-thread | streams-example]
o.a.k.c.c.i.AbstractCoordinator - [Consumer
clientId=streams-example-9213da8c-22ad-4116-82bd-47abf80bbf15-StreamThread-2-consumer,
groupId=streams-example] Attempt to heartbeat failed since group is rebalancing
--
This message was sent by Atlassian Jira
(v8.3.4#803005)