owner info in zk is not correct
Hi, We are using kafka 0.7. 2 brokers, each broker has 10 partitions for one topic 3 consumers in one consumer group, each consumer create 10 streams. Today, when we want to rollout new service. After we restart one consumer we find exceptions and warning. kafka.common.ConsumerRebalanceFailedException: RecommendEvent_sd-sns-relation01.bj-1399630465426-53d3aefc can't rebalance after 4 retries [INFO 2014-05-12 15:17:47.364] kafka.utils.Logging$class.info(Logging.scala:61) [conflict in /consumers/RecommendEvent/owners/sensei/1-2 data: RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-2 stored data: RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1] [INFO 2014-05-12 15:17:47.366] kafka.utils.Logging$class.info(Logging.scala:61) [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the partition ownership to be deleted: 1-2] [INFO 2014-05-12 15:17:47.375] kafka.utils.Logging$class.info(Logging.scala:61) [conflict in /consumers/RecommendEvent/owners/sensei/1-3 data: RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-3 stored data: RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1] [INFO 2014-05-12 15:17:47.375] kafka.utils.Logging$class.info(Logging.scala:61) [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the partition ownership to be deleted: 1-3] [INFO 2014-05-12 15:17:47.385] kafka.utils.Logging$class.info(Logging.scala:61) [conflict in /consumers/RecommendEvent/owners/sensei/1-5 data: RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-5 stored data: RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2] [INFO 2014-05-12 15:17:47.386] kafka.utils.Logging$class.info(Logging.scala:61) [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the partition ownership to be deleted: 1-5] And I opened zk viewer. In zk, we found 2 consumers in ConsumerGroup/ids: RecommendEvent_sd-sns-relation02.bj-1399635256619-5d8123c6 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3 And in owners/topic/ we found all partitions are assigned to sd-sns-relation03.bj: Here is the owner info: 1:0 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 1:1 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 1:2 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 1:3 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 1:4 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 1:5 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 1:6 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 1:7 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 1:8 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 1:9 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 2:0 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 2:1 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 2:2 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 2:3 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 2:4 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 2:5 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-5 2:6 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-6 2:7 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-7 2:8 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-8 2:9 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-9 So all partitions are assigned to sd-sns-relation03.bj, but from logs and counter, we are sure sd-sns-relation02.bj has input too. My question is: 1. why rebalance failed? 2. why owner info is wrong? btw: zkclient is 0.2
Re: owner info in zk is not correct
Hello Yonghui, In 0.7 the consumer rebalance logic is distributed and in some corner cases such as soft-failure-caused-consecutive rebalances some consumer may consider the rebalance as complete while others are still trying the rebalance process. You can check the GC logs on your consumer to verify if that is the case: https://issues.apache.org/jira/browse/KAFKA-242 If you bounce the consumers to trigger another rebalance, this issue would likely to be resolved. To solve this issue in 0.9 we are moving the group management like load rebalance from the ZK-based distributed logic into a centralized coordiantor. Details can be found here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Guozhang On Mon, May 12, 2014 at 12:48 AM, Yonghui Zhao wrote: > Hi, > > We are using kafka 0.7. > > 2 brokers, each broker has 10 partitions for one topic > 3 consumers in one consumer group, each consumer create 10 streams. > > > Today, when we want to rollout new service. > After we restart one consumer we find exceptions and warning. > > kafka.common.ConsumerRebalanceFailedException: > RecommendEvent_sd-sns-relation01.bj-1399630465426-53d3aefc can't rebalance > after 4 retries > > > [INFO 2014-05-12 15:17:47.364] > kafka.utils.Logging$class.info(Logging.scala:61) > [conflict in /consumers/RecommendEvent/owners/sensei/1-2 data: > RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-2 stored data: > RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1] > [INFO 2014-05-12 15:17:47.366] > kafka.utils.Logging$class.info(Logging.scala:61) > [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the > partition ownership to be deleted: 1-2] > [INFO 2014-05-12 15:17:47.375] > kafka.utils.Logging$class.info(Logging.scala:61) > [conflict in /consumers/RecommendEvent/owners/sensei/1-3 data: > RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-3 stored data: > RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1] > [INFO 2014-05-12 15:17:47.375] > kafka.utils.Logging$class.info(Logging.scala:61) > [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the > partition ownership to be deleted: 1-3] > [INFO 2014-05-12 15:17:47.385] > kafka.utils.Logging$class.info(Logging.scala:61) > [conflict in /consumers/RecommendEvent/owners/sensei/1-5 data: > RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-5 stored data: > RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2] > [INFO 2014-05-12 15:17:47.386] > kafka.utils.Logging$class.info(Logging.scala:61) > [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the > partition ownership to be deleted: 1-5] > > > > And I opened zk viewer. > > In zk, we found 2 consumers in ConsumerGroup/ids: > > RecommendEvent_sd-sns-relation02.bj-1399635256619-5d8123c6 > RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3 > > > And in owners/topic/ we found all partitions are assigned to > sd-sns-relation03.bj: > > Here is the owner info: > 1:0 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 > 1:1 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 > 1:2 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 > 1:3 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 > 1:4 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 > 1:5 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 > 1:6 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 > 1:7 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 > 1:8 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 > 1:9 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 > > 2:0 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0 > 2:1 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1 > 2:2 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2 > 2:3 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3 > 2:4 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4 > 2:5 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-5 > 2:6 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-6 > 2:7 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-7 > 2:8 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-8 > 2:9 RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-9 > > > So all partitions are assigned to sd-sns-relation03.bj, but from logs and > counter, we are sure sd-sns-relation02.bj has input too. > > > My question is: > > 1. why rebalance failed? > 2. why owner info is wrong? btw: zkclient is 0.2 > -- -- Guozhang
Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled
Hi all, I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector. Steps to reproduce: 1. Start single-broker Kafka cluster with auto.create.topic.enable set to "true" 2. Start ConsumerConnector on topic (which does not yet exist) with auto.offset.reset set to "smallest". 3. Produce some data to the topic. 4. Bring Zookeeper down 5. Call ConsumerConnector.close() Observation - the call blocks forever with the following stack trace: java.lang.Thread.State: TIMED_WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0xc6bf0570> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072) at org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619) at org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615) at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679) at org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808) at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777) at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown Source) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown Source) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at kafka.utils.Pool$$anon$1.foreach(Unknown Source) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at kafka.utils.Pool.foreach(Unknown Source) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown Source) at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) - locked <0xc6be0c60> (a java.lang.Object) at kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source) ... Once I set "auto.commit.enable" to "false", the problem is gone. This is identical to what's described in https://issues.apache.org/jira/browse/KAFKA-601, with the only difference that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this issue other than disabling auto-commit? Thanks, Yury
Loss of Leader in Kafka
Hi, I am hitting a strange exception while creating a topic in Kafka - Steps to generate this- 1. Created a topic multipartition_test with 2 partitions and 2 replicas 2. Added some data to this topics and verified data is coming up for both partitions 3. Deleted the topic. Checked only the zookeeper to see if the /brokers/topics DOES NOT have the topic 4. Recreated the topic in exactly the same way as in point 1. After this, when I list topics using ./kafka-list-topic.sh, i see that *leader:* none and *isr:* for this topic. State change logs give the following exception. kafka.common.StateChangeFailedException: encountered error while electing leader for partition [multipartition_test,1] due to: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state. at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327) at kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109) at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312) at kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361) at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr information doesn't exist for partition [multipartition_test,1] in OnlinePartition state at kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347) at kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291) ... 11 more Can you please help what am I doing wrong? Regards, kashyap
Re: question about mirror maker
if placing mirror maker in the same datacenter as target cluster, it/consumer will talks to zookeeper in remote/source datacenter. would it more susceptible to network problems? As for the problem commit offset without actually producing/writing msgs to target cluster, it can be solved by disabling auto-commit. and only commit msgs that are actually persisted in target cluster. what do you think of this opposite approach? On Sun, May 11, 2014 at 8:48 PM, Todd Palino wrote: > Yes, on both counts. Putting the mirror maker in the same datacenter in > the target cluster is exactly what we do as well. We also monitor both the > consumer lag (by comparing the offsets stored in Zookeeper and the tail > offset on the brokers), and the number of dropped and failed messages on > the mirror maker producer side. The other thing to do is to make sure to > check very carefully when you are changing anything about the producer > configuration, to assure that you have not made a mistake. > > -Todd > > On 5/11/14, 9:12 AM, "Weide Zhang" wrote: > > >Hi Todd, > > > >Thanks for your answer. with regard to fail over for mirror maker, does > >that mean if i have 4 mirror maker running in different machines with same > >consumer group, it will auto load balance if one of the mirror maker fails > >? Also, it looks to prevent mirror maker commit wrong (consumer work but > >not producer) due to cross data center network issue, mirror maker need to > >be placed along with the target cluster so that this scenario is minimized > >? > > > > > >On Sat, May 10, 2014 at 11:39 PM, Todd Palino > >wrote: > > > >> Well, if you have a cluster in each datacenter, all with the same > >>topics, > >> you can¹t just mirror the messages between them, as you will create a > >> loop. The way we do it is to have a ³local² cluster and an ³aggregate² > >> cluster. The local cluster has the data for only that datacenter. Then > >>we > >> run mirror makers that copy the messages from each of the local clusters > >> into the aggregate cluster. Everything produces into the local clusters, > >> and nothing produces into the aggregate clusters. In general, consumers > >> consume from the aggregate cluster (unless they specifically want only > >> local data). > >> > >> The mirror maker is as fault tolerant as any other consumer. That is, > >>if a > >> mirror maker goes down, the others configured with the same consumer > >>group > >> (we generally run at least 4 for any mirror maker, sometimes up to 10) > >> will rebalance and start back up from the last committed offset. What > >>you > >> need to watch out for is if the mirror maker is unable to produce > >> messages, for example, if the network goes down. If it can still consume > >> messages, but cannot produce them, you will lose messages as the > >>consumer > >> will continue to commit offsets with no knowledge that the producer is > >> failing. > >> > >> -Todd > >> > >> On 5/8/14, 11:20 AM, "Weide Zhang" wrote: > >> > >> >Hi, > >> > > >> >I have a question about mirror maker. say I have 3 data centers each > >> >producing topic 'A' with separate kafka cluster running. if 3 of the > >>data > >> >need to be kept in sync with each other, shall i create 3 mirror maker > >>in > >> >each data center to get the data from the other two ? > >> > > >> >also, it mentioned that mirror making is not fault tolerant ? so what > >>will > >> >be the behavior of mirror consumer if it went down due to network and > >>back > >> >up ? do they catch up with last offset from which they last mirror ? If > >> >so, > >> >is it enabled by default or I have to configure ? > >> > > >> >Thanks a lot, > >> > > >> >Weide > >> > >> > >
IndexOutOfBoundsException error
Hi all, I am seeing some corrupt data on my 0.7.2 Kafka cluster. Every once in awhile I'll get the following message: Exception in thread "kafka-consumer" java.lang.IndexOutOfBoundsException at java.nio.Buffer.checkIndex(Unknown Source) at java.nio.HeapByteBuffer.get(Unknown Source) at kafka.message.Message.magic(Message.scala:133) at kafka.message.Message.checksum(Message.scala:146) at kafka.message.Message.isValid(Message.scala:158) at kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:129) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.message.MessageSet.foreach(MessageSet.scala:87) at kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:97) at kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:94) at scala.collection.Iterator$class.foreach(Iterator.scala:631) at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) at scala.collection.IterableLike$class.foreach(IterableLike.scala:79) at kafka.api.MultiFetchResponse.foreach(MultiFetchResponse.scala:25) at kafka.tools.SimpleConsumerShell$$anon$1.run(SimpleConsumerShell.scala:94) The only way I can get past this error is to move the offset past the current message to skip the bad data. I use the simpleConsumer.getOffsetsBefore() method to get a valid lists of offsets and I skip to the next valid offset. Has anyone encountered this issue before? I found a message where someone experienced similar issues and they were running JDK 1.7 and was able to get around it by running JDK 1.6. I am running OpenJDK 1.7 but I tried running Sun JDK 1.6 but I still have the same problem. My guess is that the data is corrupt somehow but I don't know how to look at the raw data to confirm. Can anyone suggest ideas for me to debug this issue? Thanks, Xuyen
Re: question about isr
You're probably right that it has to be GC given the CMS activity I saw in the log but I didn't see a single concurrent mode failure, which bothers me that we had this happen anyway... also the ZK timeout is set to a very large number... I dunno, seems weird. I will see what I can do about getting 0.8.1.1 deployed... how do you expect it to address this problem? --Ian On May 12, 2014, at 10:49 AM, Jun Rao wrote: > Could you try 0.8.1.1, which fixed some bugs related to controller hanging > during shutdown. > > For ZK session expiration, the #1 cause is GC. We have also seen that > transient issues with the underlying storage (e.g. raid controller reset) can > also cause ZK session expiration. > > As for the data loss in the producer, you are probably using ack=1, which > could lead to data loss during leader failover. It just that in this case, > the failover window is forever due to some bugs. > > Thanks, > > Jun > > > On Sun, May 11, 2014 at 10:14 PM, Ian Friedman wrote: > Jun - We're using 0.8.1 > The timestamps in the last few entries in controller.log seem to correspond > to when the trouble started in server.log > controller.log: > > [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK expired; > shut down all controller components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) > > [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > > server.log: > > [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] on > broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] from > 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and > expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state > (kafka.utils.ZkUtils$) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] on > broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip updating > ISR (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on > broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from > 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,111] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-High/partitions/3/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and > expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state > (kafka.utils.ZkUtils$) > > I'm also concerned why this happened at all... I do see some slightly high > CMS activity in the GC log but I don't see any Full GCs and the whole thing > was over within a minute... I'm a bit concerned how something like this could > make a broker fail this way! > > One of the after effects we found from leaving the broker running in this > state is that it was still allowing producers with cached metadata to produce > to it, so we lost a whole bunch of messages before I shut him down. What's up > with that?? > > > > --Ian > > > > On Sun, May 11, 2014 at 11:39 PM, Jun Rao wrote: > It seems that broker 1 had a soft failure (potentially due to GC). However, > somehow it didn't receive the latest LeaderAndIsrRequest from the controller. > Which version of Kafka are you using? In the controller log, do you see > broker 1 being added back as a live broker? > > Thanks, > > Jun > > > On Fri, May 9, 2014 at 10:21 AM, Ian Friedman wrote: > This seems similar to behavior we’re seeing. At some point one of our brokers > (id 1) just gives up and starts throwing those errors and kafka-topics no > longer lists it as a ISR. However the logs for that broker say something very > odd: > > [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] on > broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip updating > ISR (kafka.cluster.Partition) > [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on broker > 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3 to 1 > (kafka.cluster.Partition) > [2014-05-09 10:16:00,251] ERROR Conditional update of path > /brokers/to
Re: question about isr
Could you try 0.8.1.1, which fixed some bugs related to controller hanging during shutdown. For ZK session expiration, the #1 cause is GC. We have also seen that transient issues with the underlying storage (e.g. raid controller reset) can also cause ZK session expiration. As for the data loss in the producer, you are probably using ack=1, which could lead to data loss during leader failover. It just that in this case, the failover window is forever due to some bugs. Thanks, Jun On Sun, May 11, 2014 at 10:14 PM, Ian Friedman wrote: > Jun - We're using 0.8.1 > The timestamps in the last few entries in controller.log seem to > correspond to when the trouble started in server.log > > controller.log: > > [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK > expired; shut down all controller components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) > > [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > > server.log: > > [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] > on broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] > from 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} > and expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state > (kafka.utils.ZkUtils$) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] > on broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip > updating ISR (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on > broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from > 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,111] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-High/partitions/3/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} > and expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state > (kafka.utils.ZkUtils$) > > I'm also concerned why this happened at all... I do see some slightly high > CMS activity in the GC log but I don't see any Full GCs and the whole thing > was over within a minute... I'm a bit concerned how something like this > could make a broker fail this way! > > One of the after effects we found from leaving the broker running in this > state is that it was still allowing producers with cached metadata to > produce to it, so we lost a whole bunch of messages before I shut him down. > What's up with that?? > > > --Ian > > > On Sun, May 11, 2014 at 11:39 PM, Jun Rao wrote: > >> It seems that broker 1 had a soft failure (potentially due to GC). >> However, somehow it didn't receive the latest LeaderAndIsrRequest from the >> controller. Which version of Kafka are you using? In the controller log, do >> you see broker 1 being added back as a live broker? >> >> Thanks, >> >> Jun >> >> >> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman wrote: >> >>> This seems similar to behavior we’re seeing. At some point one of our >>> brokers (id 1) just gives up and starts throwing those errors and >>> kafka-topics no longer lists it as a ISR. However the logs for that broker >>> say something very odd: >>> >>> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] >>> on broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip >>> updating ISR (kafka.cluster.Partition) >>> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on >>> broker 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3 >>> to 1 (kafka.cluster.Partition) >>> [2014-05-09 10:16:00,251] ERROR Conditional update of path >>> /brokers/topics/callbackServiceTopic/partitions/3/state with data >>> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} >>> and expected version 9 failed due to >>> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = >>> BadVersion for /brokers/topics/callbackServiceTopic/partitions/3/state >>> (kafka.utils.ZkUtils$) >>> [2014-05-09 10:16:00,251] INFO Partition [callbackServiceTopic,3] on >>> broker 1: Cached zkVersion [9] not equal to that in zookeep
Re: question about isr
Another question - If I start broker 1 back up now, with his replicas now out of sync, will he automatically catch up with the leaders or do I have to do something to catch him up? --Ian On May 12, 2014, at 10:49 AM, Jun Rao wrote: > Could you try 0.8.1.1, which fixed some bugs related to controller hanging > during shutdown. > > For ZK session expiration, the #1 cause is GC. We have also seen that > transient issues with the underlying storage (e.g. raid controller reset) can > also cause ZK session expiration. > > As for the data loss in the producer, you are probably using ack=1, which > could lead to data loss during leader failover. It just that in this case, > the failover window is forever due to some bugs. > > Thanks, > > Jun > > > On Sun, May 11, 2014 at 10:14 PM, Ian Friedman wrote: > Jun - We're using 0.8.1 > The timestamps in the last few entries in controller.log seem to correspond > to when the trouble started in server.log > controller.log: > > [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK expired; > shut down all controller components and try to re-elect > (kafka.controller.KafkaController$SessionExpirationListener) > > [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down > (kafka.controller.TopicDeletionManager$DeleteTopicsThread) > > server.log: > > [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. > (kafka.network.Processor) > > [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] on > broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] from > 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and > expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for > /brokers/topics/callbackServiceTopic-Medium/partitions/27/state > (kafka.utils.ZkUtils$) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] on > broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip updating > ISR (kafka.cluster.Partition) > > [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on > broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from > 1,2,3 to 1 (kafka.cluster.Partition) > > [2014-05-08 19:01:30,111] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic-High/partitions/3/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and > expected version 9 failed due to > org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = > BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state > (kafka.utils.ZkUtils$) > > I'm also concerned why this happened at all... I do see some slightly high > CMS activity in the GC log but I don't see any Full GCs and the whole thing > was over within a minute... I'm a bit concerned how something like this could > make a broker fail this way! > > One of the after effects we found from leaving the broker running in this > state is that it was still allowing producers with cached metadata to produce > to it, so we lost a whole bunch of messages before I shut him down. What's up > with that?? > > > > --Ian > > > > On Sun, May 11, 2014 at 11:39 PM, Jun Rao wrote: > It seems that broker 1 had a soft failure (potentially due to GC). However, > somehow it didn't receive the latest LeaderAndIsrRequest from the controller. > Which version of Kafka are you using? In the controller log, do you see > broker 1 being added back as a live broker? > > Thanks, > > Jun > > > On Fri, May 9, 2014 at 10:21 AM, Ian Friedman wrote: > This seems similar to behavior we’re seeing. At some point one of our brokers > (id 1) just gives up and starts throwing those errors and kafka-topics no > longer lists it as a ISR. However the logs for that broker say something very > odd: > > [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] on > broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip updating > ISR (kafka.cluster.Partition) > [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on broker > 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3 to 1 > (kafka.cluster.Partition) > [2014-05-09 10:16:00,251] ERROR Conditional update of path > /brokers/topics/callbackServiceTopic/partitions/3/state with data > {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and > expected version 9 failed due to > org.apache.z
Re: question about isr
0.8.1.1 fixed an issue when the controller could hang during soft failure. Out-of sync replicas will catch up from the leader automatically, once restarted. Thanks, Jun On Mon, May 12, 2014 at 12:37 PM, Ian Friedman wrote: > You're probably right that it has to be GC given the CMS activity I saw in > the log but I didn't see a single concurrent mode failure, which bothers me > that we had this happen anyway... also the ZK timeout is set to a very > large number... I dunno, seems weird. > > I will see what I can do about getting 0.8.1.1 deployed... how do you > expect it to address this problem? > > --Ian > > > On May 12, 2014, at 10:49 AM, Jun Rao wrote: > > Could you try 0.8.1.1, which fixed some bugs related to controller hanging > during shutdown. > > For ZK session expiration, the #1 cause is GC. We have also seen that > transient issues with the underlying storage (e.g. raid controller reset) > can also cause ZK session expiration. > > As for the data loss in the producer, you are probably using ack=1, which > could lead to data loss during leader failover. It just that in this case, > the failover window is forever due to some bugs. > > Thanks, > > Jun > > > On Sun, May 11, 2014 at 10:14 PM, Ian Friedman wrote: > >> Jun - We're using 0.8.1 >> The timestamps in the last few entries in controller.log seem to >> correspond to when the trouble started in server.log >> >> controller.log: >> >> [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK >> expired; shut down all controller components and try to re-elect >> (kafka.controller.KafkaController$SessionExpirationListener) >> >> [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down >> (kafka.controller.TopicDeletionManager$DeleteTopicsThread) >> >> server.log: >> >> [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. >> (kafka.network.Processor) >> >> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. >> (kafka.network.Processor) >> >> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. >> (kafka.network.Processor) >> >> [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] >> on broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] >> from 1,2,3 to 1 (kafka.cluster.Partition) >> >> [2014-05-08 19:01:30,109] ERROR Conditional update of path >> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data >> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} >> and expected version 9 failed due to >> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = >> BadVersion for >> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state >> (kafka.utils.ZkUtils$) >> >> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] >> on broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip >> updating ISR (kafka.cluster.Partition) >> >> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on >> broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from >> 1,2,3 to 1 (kafka.cluster.Partition) >> >> [2014-05-08 19:01:30,111] ERROR Conditional update of path >> /brokers/topics/callbackServiceTopic-High/partitions/3/state with data >> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} >> and expected version 9 failed due to >> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = >> BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state >> (kafka.utils.ZkUtils$) >> >> I'm also concerned why this happened at all... I do see some slightly >> high CMS activity in the GC log but I don't see any Full GCs and the whole >> thing was over within a minute... I'm a bit concerned how something like >> this could make a broker fail this way! >> >> One of the after effects we found from leaving the broker running in this >> state is that it was still allowing producers with cached metadata to >> produce to it, so we lost a whole bunch of messages before I shut him down. >> What's up with that?? >> >> >> --Ian >> >> >> On Sun, May 11, 2014 at 11:39 PM, Jun Rao wrote: >> >>> It seems that broker 1 had a soft failure (potentially due to GC). >>> However, somehow it didn't receive the latest LeaderAndIsrRequest from the >>> controller. Which version of Kafka are you using? In the controller log, do >>> you see broker 1 being added back as a live broker? >>> >>> Thanks, >>> >>> Jun >>> >>> >>> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman wrote: >>> This seems similar to behavior we’re seeing. At some point one of our brokers (id 1) just gives up and starts throwing those errors and kafka-topics no longer lists it as a ISR. However the logs for that broker say something very odd: [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] on broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip upd
Kafka producer in CSharp
Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer("broker", 9092); producer.Send("kafkademo3", 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"
Re: IndexOutOfBoundsException error
Could you run the DumpLogSegments tool to see if there are indeed corrupted messages in the broker log? Thanks, Jun On Mon, May 12, 2014 at 4:08 PM, Xuyen On wrote: > Hi all, > > I am seeing some corrupt data on my 0.7.2 Kafka cluster. Every once in > awhile I'll get the following message: > > Exception in thread "kafka-consumer" java.lang.IndexOutOfBoundsException > at java.nio.Buffer.checkIndex(Unknown Source) > at java.nio.HeapByteBuffer.get(Unknown Source) > at kafka.message.Message.magic(Message.scala:133) > at kafka.message.Message.checksum(Message.scala:146) > at kafka.message.Message.isValid(Message.scala:158) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:129) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166) > at > kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100) > at > kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59) > at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at kafka.message.MessageSet.foreach(MessageSet.scala:87) > at > kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:97) > at > kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:94) > at scala.collection.Iterator$class.foreach(Iterator.scala:631) > at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30) > at > scala.collection.IterableLike$class.foreach(IterableLike.scala:79) > at > kafka.api.MultiFetchResponse.foreach(MultiFetchResponse.scala:25) > at > kafka.tools.SimpleConsumerShell$$anon$1.run(SimpleConsumerShell.scala:94) > > The only way I can get past this error is to move the offset past the > current message to skip the bad data. I use the > simpleConsumer.getOffsetsBefore() method to get a valid lists of offsets > and I skip to the next valid offset. > > Has anyone encountered this issue before? I found a message where someone > experienced similar issues and they were running JDK 1.7 and was able to > get around it by running JDK 1.6. I am running OpenJDK 1.7 but I tried > running Sun JDK 1.6 but I still have the same problem. > > My guess is that the data is corrupt somehow but I don't know how to look > at the raw data to confirm. Can anyone suggest ideas for me to debug this > issue? > > Thanks, > > Xuyen > > >
Re: kafka broker failed to recovery from ZK failure
The controller log in broker 1 is too late. Could you send its log around 2014-05-12 21:24:37? Thanks, Jun On Mon, May 12, 2014 at 5:02 PM, Steven Wu wrote: > This is a three-node cluster. broker 0 lost connection to ZK. broker 1 > does seem to take the controller role. but broker 0 stuck in the bad state > and wasn't able to recover. > > it seems to start with these error msgs. I have attached complete > controller and server log for broker 0 and 1. > > I am using "kafka_2.9.2-0.8.1.1.jar". > > Thanks, > Steven > > [2014-05-12 21:24:28,737] INFO Client session timed out, have not heard > from server in 4000ms for sessionid 0xb145a9585a806013, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:28,838] INFO zookeeper state changed (Disconnected) > (org.I0Itec.zkclient.ZkClient) > [2014-05-12 21:24:29,360] INFO Opening socket connection to server > ip-10-84-58-49.ec2.internal/10.84.58.49:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:30,562] INFO Client session timed out, have not heard > from server in 1724ms for sessionid 0xb145a9585a806013, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:30,706] INFO Opening socket connection to server > ip-10-45-171-150.ec2.internal/10.45.171.150:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:31,907] INFO Client session timed out, have not heard > from server in 1245ms for sessionid 0xb145a9585a806013, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:32,927] INFO Opening socket connection to server > ip-10-45-172-197.ec2.internal/10.45.172.197:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:34,128] INFO Client session timed out, have not heard > from server in 2120ms for sessionid 0xb145a9585a806013, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:34,380] INFO Opening socket connection to server > ip-10-33-144-217.ec2.internal/10.33.144.217:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:35,581] INFO Client session timed out, have not heard > from server in 1353ms for sessionid 0xb145a9585a806013, closing socket > connection and attempting reconnect (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:36,033] INFO Closing socket connection to /10.60.94.175. > (kafka.network.Processor) > [2014-05-12 21:24:37,282] INFO Opening socket connection to server > ip-10-78-235-244.ec2.internal/10.78.235.244:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,284] INFO Socket connection established to > ip-10-78-235-244.ec2.internal/10.78.235.244:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,288] INFO zookeeper state changed (Expired) > (org.I0Itec.zkclient.ZkClient) > [2014-05-12 21:24:37,288] INFO Unable to reconnect to ZooKeeper service, > session 0xb145a9585a806013 has expired, closing socket connection > (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,288] INFO Initiating client connection, connectString= > ec2-50-19-255-1.compute-1.amazonaws.com:2181, > ec2-54-235-159-245.compute-1.amazonaws.com:2181, > ec2-50-19-255-97.compute-1.amazonaws.com:2181, > ec2-184-73-152-248.compute-1.amazonaws.com:2181, > ec2-50-17-247-179.compute-1.amazonaws.com:2181/kafkabroker/defaultsessionTimeout=6000 > watcher=org.I0Itec.zkclient.ZkClient@6f2a4d2f(org.apache.zookeeper.ZooKeeper) > [2014-05-12 21:24:37,305] INFO EventThread shut down > (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,311] INFO Opening socket connection to server > ip-10-171-10-136.ec2.internal/10.171.10.136:2181. Will not attempt to > authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,311] INFO Socket connection established to > ip-10-171-10-136.ec2.internal/10.171.10.136:2181, initiating session > (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,316] INFO Session establishment complete on server > ip-10-171-10-136.ec2.internal/10.171.10.136:2181, sessionid = > 0xb445f1c68e5c013d, negotiated timeout = 6000 > (org.apache.zookeeper.ClientCnxn) > [2014-05-12 21:24:37,316] INFO zookeeper state changed (SyncConnected) > (org.I0Itec.zkclient.ZkClient) > [2014-05-12 21:24:37,335] INFO conflict in /controller data: > {"version":1,"brokerid":0,"timestamp":"1399929877328"} stored data: > {"version":1,"brokerid":1,"timestamp":"1399321265926"} (kafka.utils.: > [2014-05-12 21:24:37,348] INFO re-registering broker info in ZK for broker > 0 (kafka.server.KafkaHealthcheck) > [2014-05-12 21:24:37,352] INFO Registered broker 0 at path /
Re: Loss of Leader in Kafka
Delete topic doesn't quite work in 0.8.1. We recently fixed it in trunk. Could you give it a try and see if you see the same issue? Thanks, Jun On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar wrote: > Hi, > I am hitting a strange exception while creating a topic in Kafka - > Steps to generate this- > 1. Created a topic multipartition_test with 2 partitions and 2 replicas > 2. Added some data to this topics and verified data is coming up for both > partitions > 3. Deleted the topic. Checked only the zookeeper to see if the > /brokers/topics DOES NOT have the topic > 4. Recreated the topic in exactly the same way as in point 1. > > After this, when I list topics using ./kafka-list-topic.sh, i see that > *leader:* none and *isr:* for this topic. State change logs give the > following exception. > > kafka.common.StateChangeFailedException: encountered error while electing > leader for partition [multipartition_test,1] due to: LeaderAndIsr > information doesn't exist for partition [multipartition_test,1] in > OnlinePartition state. > at > > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327) > at > > kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154) > at > > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110) > at > > kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109) > at scala.collection.immutable.Set$Set2.foreach(Set.scala:101) > at > > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109) > at > > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325) > at > > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312) > at > > kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376) > at > > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361) > at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr > information doesn't exist for partition [multipartition_test,1] in > OnlinePartition state > at > > kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347) > at > > kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291) > ... 11 more > > Can you please help what am I doing wrong? > > Regards, > kashyap >
Re: question about mirror maker
As far as Zookeeper goes, any time you have network communication you have the change of a problem. I would rather have the network issue on the consumer side, rather than the producer side. I would certainly prefer to have the offsets committed only after the message is produced (based on the acks setting for the producer). The problem with this is that the consumer and the producer in mirror maker are separated by a queue, and there would need to be a significant amount of communicated added between the two for that to work, but without a loss of throughput. -Todd On 5/12/14, 11:26 AM, "Steven Wu" wrote: >if placing mirror maker in the same datacenter as target cluster, >it/consumer will talks to zookeeper in remote/source datacenter. would it >more susceptible to network problems? > >As for the problem commit offset without actually producing/writing msgs >to >target cluster, it can be solved by disabling auto-commit. and only commit >msgs that are actually persisted in target cluster. > >what do you think of this opposite approach? > > >On Sun, May 11, 2014 at 8:48 PM, Todd Palino wrote: > >> Yes, on both counts. Putting the mirror maker in the same datacenter in >> the target cluster is exactly what we do as well. We also monitor both >>the >> consumer lag (by comparing the offsets stored in Zookeeper and the tail >> offset on the brokers), and the number of dropped and failed messages on >> the mirror maker producer side. The other thing to do is to make sure to >> check very carefully when you are changing anything about the producer >> configuration, to assure that you have not made a mistake. >> >> -Todd >> >> On 5/11/14, 9:12 AM, "Weide Zhang" wrote: >> >> >Hi Todd, >> > >> >Thanks for your answer. with regard to fail over for mirror maker, does >> >that mean if i have 4 mirror maker running in different machines with >>same >> >consumer group, it will auto load balance if one of the mirror maker >>fails >> >? Also, it looks to prevent mirror maker commit wrong (consumer work >>but >> >not producer) due to cross data center network issue, mirror maker >>need to >> >be placed along with the target cluster so that this scenario is >>minimized >> >? >> > >> > >> >On Sat, May 10, 2014 at 11:39 PM, Todd Palino >> >wrote: >> > >> >> Well, if you have a cluster in each datacenter, all with the same >> >>topics, >> >> you can¹t just mirror the messages between them, as you will create a >> >> loop. The way we do it is to have a ³local² cluster and an >>³aggregate² >> >> cluster. The local cluster has the data for only that datacenter. >>Then >> >>we >> >> run mirror makers that copy the messages from each of the local >>clusters >> >> into the aggregate cluster. Everything produces into the local >>clusters, >> >> and nothing produces into the aggregate clusters. In general, >>consumers >> >> consume from the aggregate cluster (unless they specifically want >>only >> >> local data). >> >> >> >> The mirror maker is as fault tolerant as any other consumer. That is, >> >>if a >> >> mirror maker goes down, the others configured with the same consumer >> >>group >> >> (we generally run at least 4 for any mirror maker, sometimes up to >>10) >> >> will rebalance and start back up from the last committed offset. What >> >>you >> >> need to watch out for is if the mirror maker is unable to produce >> >> messages, for example, if the network goes down. If it can still >>consume >> >> messages, but cannot produce them, you will lose messages as the >> >>consumer >> >> will continue to commit offsets with no knowledge that the producer >>is >> >> failing. >> >> >> >> -Todd >> >> >> >> On 5/8/14, 11:20 AM, "Weide Zhang" wrote: >> >> >> >> >Hi, >> >> > >> >> >I have a question about mirror maker. say I have 3 data centers each >> >> >producing topic 'A' with separate kafka cluster running. if 3 of the >> >>data >> >> >need to be kept in sync with each other, shall i create 3 mirror >>maker >> >>in >> >> >each data center to get the data from the other two ? >> >> > >> >> >also, it mentioned that mirror making is not fault tolerant ? so >>what >> >>will >> >> >be the behavior of mirror consumer if it went down due to network >>and >> >>back >> >> >up ? do they catch up with last offset from which they last mirror >>? If >> >> >so, >> >> >is it enabled by default or I have to configure ? >> >> > >> >> >Thanks a lot, >> >> > >> >> >Weide >> >> >> >> >> >>
CSharp librari and Producer Closing socket for because of error (kafka.network.Processor),java.nio.BufferUnderflowException
Hi I have kafka broker running (kafka_2.9.1-0.8.1.1) All is working. One project requires producer is written in CSharp I am not dot net programmer but I managed to write simple producer code using https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md the code ... using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; using Kafka.Client; namespace DemoProducer { class Program { static void Main(string[] args) { string payload1 = "kafka 1."; byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); Message msg1 = new Message(payloadData1); string payload2 = "kafka 2."; byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); Message msg2 = new Message(payloadData2); Producer producer = new Producer("broker", 9092); producer.Send("kafkademo3", 0 , msg1 ); } } } ... In broker side I am getting the error if I executing the code above: [2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because of error (kafka.network.Processor) java.nio.BufferUnderflowException at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145) at java.nio.ByteBuffer.get(ByteBuffer.java:694) at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38) at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) at kafka.network.RequestChannel$Request.(RequestChannel.scala:53) at kafka.network.Processor.read(SocketServer.scala:353) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) [2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 because of error (kafka.network.Processor) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:197) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at kafka.utils.Utils$.read(Utils.scala:375) at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54) at kafka.network.Processor.read(SocketServer.scala:347) at kafka.network.Processor.run(SocketServer.scala:245) at java.lang.Thread.run(Thread.java:744) I suspected that the problem is in the broker version (kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating. Now I was able to send messages using CSharp code. So is there workaround how I can use latest kafka version and CSharp ? Or What is the latest kafka version supporting CSharp producer? And one more question. In Csharp lib how can I give to producer brokers list to get fault tolerance in case one broker is down? -- Best regards, Margus (Margusja) Roo +372 51 48 780 http://margus.roo.ee http://ee.linkedin.com/in/margusroo skype: margusja ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"