Hi,
Our cluster set up is as follows, with Version 0.9.0.1 (confluent package)
we have 5 source clusters and 1 destination cluster. 5 instances of
mirrormaker run on the destination cluster
(one instance per source cluster, and each mirrormaker runs with
num.streams=8)

We have been seeing an issue for a the last few days when the partitions
shrink and expand on one node,
which causes mirrormaker to shutdown (known Memory record not writable
issue or NOT_ENOUGH_REPLICAS_AFTER_APPEND issue)

Some of the logs are as follows

*Node A is the controller*
*I. controller log has *
DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(2, 1, 0) for
TopicAndPartitions:Set([test,115], [test,16], [test,52])
(kafka.controller.IsrChangeNotificationListener)
WARN [Controller-0-to-broker-2-send-thread], Controller 0 epoch 84 fails to
send request {controller_id=0,controller_epoch=84,partition_states=[...]
java.io.IOException: Connection to 2 was disconnected before the response
was read
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at
kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClientBlockingOps.scala:129)
        at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.scala:139)
        at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
        at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:180)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
WARN [Controller-0-to-broker-1-send-thread], Controller 0 epoch 84 fails to
send request[...]
java.io.IOException: Connection to 1 was disconnected before the response
was read
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at
kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClientBlockingOps.scala:129)
        at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.scala:139)
        at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
        at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:180)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
WARN [Controller-0-to-broker-0-send-thread], Controller 0 epoch 84 fails to
send request[...]
java.io.IOException: Connection to 0 was disconnected before the response
was read
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
        at scala.Option.foreach(Option.scala:257)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
        at
kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
        at
kafka.utils.NetworkClientBlockingOps$.recurse$1(NetworkClientBlockingOps.scala:129)
        at
kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntilFound$extension(NetworkClientBlockingOps.scala:139)
        at
kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
        at
kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:180)
        at
kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:171)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)
INFO [Controller-0-to-broker-2-send-thread], Controller 0 connected to
Node(2, x.x.x.x, 9092) for sending state change requests
(kafka.controller.RequestSendThread)
INFO [Controller-0-to-broker-1-send-thread], Controller 0 connected to
Node(1, x.x.x.x, 9092) for sending state change requests
(kafka.controller.RequestSendThread)
INFO [Controller-0-to-broker-0-send-thread], Controller 0 connected to
Node(0, x.x.x.x, 9092) for sending state change requests
(kafka.controller.RequestSendThread)
DEBUG [IsrChangeNotificationListener] Fired!!!
(kafka.controller.IsrChangeNotificationListener)

*II. At the same time state-change log has the following (its pretty much
always for the same set of topics and partitions)*
TRACE Controller 0 epoch 84 sending UpdateMetadata request
(Leader:-2,ISR:0,1,2,LeaderEpoch:0,ControllerEpoch:84) to broker 2 for
partition  ...
TRACE Controller 0 epoch 84 sending UpdateMetadata request
(Leader:-2,ISR:0,1,2,LeaderEpoch:0,ControllerEpoch:84) to broker 2 for
partition ...
TRACE Controller 0 epoch 84 sending UpdateMetadata request
(Leader:-2,ISR:2,0,1,LeaderEpoch:0,ControllerEpoch:84) to broker 2 for
partition ...
TRACE Controller 0 epoch 84 sending UpdateMetadata request
(Leader:1,ISR:1,LeaderEpoch:44,ControllerEpoch:84) to broker 2 for
partition ...
TRACE Controller 0 epoch 84 sending UpdateMetadata request
(Leader:1,ISR:1,LeaderEpoch:44,ControllerEpoch:84) to broker 2 for
partition ...

*III.  on Node B, the server log contains a bunch of
NotEnoughReplicasException exceptions and Shrinks and Expands for topic
partitions *

INFO Partition [test,16] on broker 1: Shrinking ISR for partition [test,16]
from 1,2,0 to 1 (kafka.cluster.Partition)
INFO Partition [test,52] on broker 1: Shrinking ISR for partition [test,52]
from 1,2,0 to 1 (kafka.cluster.Partition)
....
...
...
ERROR [Replica Manager on Broker 1]: Error processing append operation on
partition [test,67] (kafka.server.ReplicaManager)
kafka.common.NotEnoughReplicasException: Number of insync replicas for
partition [test,67] is [1], below required minimum [2]

...
...
...
INFO Partition [test,37] on broker 1: Expanding ISR for partition [test,37]
from 1 to 1,2 (kafka.cluster.Partition)
INFO Partition [test,109] on broker 1: Expanding ISR for partition
[test,109] from 1 to 1,2 (kafka.cluster.Partition)
INFO Partition [test,37] on broker 1: Expanding ISR for partition [test,37]
from 1,2 to 1,2,0 (kafka.cluster.Partition)

As far as we have checked we don't see any network or storage related
issues.
Has anyone seen similar issues ? Any inputs will be of great help.

Thanks,
Meghana

Reply via email to