owner info in zk is not correct

2014-05-12 Thread Yonghui Zhao
Hi,

We are using kafka 0.7.

2 brokers, each broker has 10 partitions for one topic
3 consumers in one consumer group, each consumer create 10 streams.


Today, when we want to rollout new service.
After we restart one consumer we find exceptions and warning.

kafka.common.ConsumerRebalanceFailedException:
RecommendEvent_sd-sns-relation01.bj-1399630465426-53d3aefc can't rebalance
after 4 retries


[INFO  2014-05-12 15:17:47.364]
kafka.utils.Logging$class.info(Logging.scala:61)
[conflict in /consumers/RecommendEvent/owners/sensei/1-2 data:
RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-2 stored data:
RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1]
[INFO  2014-05-12 15:17:47.366]
kafka.utils.Logging$class.info(Logging.scala:61)
[RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
partition ownership to be deleted: 1-2]
[INFO  2014-05-12 15:17:47.375]
kafka.utils.Logging$class.info(Logging.scala:61)
[conflict in /consumers/RecommendEvent/owners/sensei/1-3 data:
RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-3 stored data:
RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1]
[INFO  2014-05-12 15:17:47.375]
kafka.utils.Logging$class.info(Logging.scala:61)
[RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
partition ownership to be deleted: 1-3]
[INFO  2014-05-12 15:17:47.385]
kafka.utils.Logging$class.info(Logging.scala:61)
[conflict in /consumers/RecommendEvent/owners/sensei/1-5 data:
RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-5 stored data:
RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2]
[INFO  2014-05-12 15:17:47.386]
kafka.utils.Logging$class.info(Logging.scala:61)
[RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
partition ownership to be deleted: 1-5]



And I opened zk viewer.

In zk, we found 2 consumers in ConsumerGroup/ids:

RecommendEvent_sd-sns-relation02.bj-1399635256619-5d8123c6
RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3


And in owners/topic/ we found all partitions are assigned to
sd-sns-relation03.bj:

Here is the owner info:
1:0  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
1:1  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
1:2  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
1:3  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
1:4  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
1:5  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
1:6  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
1:7  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
1:8  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4
1:9  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4

2:0  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
2:1  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
2:2  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
2:3  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
2:4  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4
2:5  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-5
2:6  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-6
2:7  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-7
2:8  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-8
2:9  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-9


So all partitions are assigned to sd-sns-relation03.bj,  but from logs and
counter, we are sure sd-sns-relation02.bj has input too.


My question is:

1. why rebalance failed?
2. why owner info is wrong?  btw: zkclient is 0.2


Re: owner info in zk is not correct

2014-05-12 Thread Guozhang Wang
Hello Yonghui,

In 0.7 the consumer rebalance logic is distributed and in some corner cases
such as soft-failure-caused-consecutive rebalances some consumer may
consider the rebalance as complete while others are still trying the
rebalance process. You can check the GC logs on your consumer to verify if
that is the case:

https://issues.apache.org/jira/browse/KAFKA-242

If you bounce the consumers to trigger another rebalance, this issue would
likely to be resolved.

To solve this issue in 0.9 we are moving the group management like load
rebalance from the ZK-based distributed logic into a centralized
coordiantor. Details can be found here:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Guozhang


On Mon, May 12, 2014 at 12:48 AM, Yonghui Zhao wrote:

> Hi,
>
> We are using kafka 0.7.
>
> 2 brokers, each broker has 10 partitions for one topic
> 3 consumers in one consumer group, each consumer create 10 streams.
>
>
> Today, when we want to rollout new service.
> After we restart one consumer we find exceptions and warning.
>
> kafka.common.ConsumerRebalanceFailedException:
> RecommendEvent_sd-sns-relation01.bj-1399630465426-53d3aefc can't rebalance
> after 4 retries
>
>
> [INFO  2014-05-12 15:17:47.364]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [conflict in /consumers/RecommendEvent/owners/sensei/1-2 data:
> RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-2 stored data:
> RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1]
> [INFO  2014-05-12 15:17:47.366]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
> partition ownership to be deleted: 1-2]
> [INFO  2014-05-12 15:17:47.375]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [conflict in /consumers/RecommendEvent/owners/sensei/1-3 data:
> RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-3 stored data:
> RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1]
> [INFO  2014-05-12 15:17:47.375]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
> partition ownership to be deleted: 1-3]
> [INFO  2014-05-12 15:17:47.385]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [conflict in /consumers/RecommendEvent/owners/sensei/1-5 data:
> RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e-5 stored data:
> RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2]
> [INFO  2014-05-12 15:17:47.386]
> kafka.utils.Logging$class.info(Logging.scala:61)
> [RecommendEvent_sd-sns-relation01.bj-1399879066480-5426fb5e waiting for the
> partition ownership to be deleted: 1-5]
>
>
>
> And I opened zk viewer.
>
> In zk, we found 2 consumers in ConsumerGroup/ids:
>
> RecommendEvent_sd-sns-relation02.bj-1399635256619-5d8123c6
> RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3
>
>
> And in owners/topic/ we found all partitions are assigned to
> sd-sns-relation03.bj:
>
> Here is the owner info:
> 1:0  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
> 1:1  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
> 1:2  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
> 1:3  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
> 1:4  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
> 1:5  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
> 1:6  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
> 1:7  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
> 1:8  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4
> 1:9  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4
>
> 2:0  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-0
> 2:1  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-1
> 2:2  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-2
> 2:3  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-3
> 2:4  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-4
> 2:5  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-5
> 2:6  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-6
> 2:7  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-7
> 2:8  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-8
> 2:9  RecommendEvent_sd-sns-relation03.bj-1399635121250-487bdbb3-9
>
>
> So all partitions are assigned to sd-sns-relation03.bj,  but from logs and
> counter, we are sure sd-sns-relation02.bj has input too.
>
>
> My question is:
>
> 1. why rebalance failed?
> 2. why owner info is wrong?  btw: zkclient is 0.2
>



-- 
-- Guozhang


Kafka 0.8's ConsumerConnector.close() hangs if ZK is unavailable and autocommit is enabled

2014-05-12 Thread Yury Ruchin
Hi all,

I'm using Kafka 0.8 and I've ran into an issue with ConsumerConnector.
Steps to reproduce:

1. Start single-broker Kafka cluster with auto.create.topic.enable set to
"true"
2. Start ConsumerConnector on topic (which does not yet exist) with
auto.offset.reset set to "smallest".
3. Produce some data to the topic.
4. Bring Zookeeper down
5. Call ConsumerConnector.close()

Observation - the call blocks forever with the following stack trace:

   java.lang.Thread.State: TIMED_WAITING (parking)

at sun.misc.Unsafe.park(Native Method)

- parking to wait for  <0xc6bf0570> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at
java.util.concurrent.locks.LockSupport.parkUntil(LockSupport.java:237)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitUntil(AbstractQueuedSynchronizer.java:2072)

at
org.I0Itec.zkclient.ZkClient.waitForKeeperState(ZkClient.java:636)

at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:619)

at
org.I0Itec.zkclient.ZkClient.waitUntilConnected(ZkClient.java:615)

at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:679)

at
org.I0Itec.zkclient.ZkClient.writeDataReturnStat(ZkClient.java:813)

at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:808)

at org.I0Itec.zkclient.ZkClient.writeData(ZkClient.java:777)

at kafka.utils.ZkUtils$.updatePersistentPath(Unknown Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3$$anonfun$apply$5.apply(Unknown
Source)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at scala.collection.AbstractIterable.foreach(Iterable.scala:54)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

at
kafka.consumer.ZookeeperConsumerConnector$$anonfun$commitOffsets$3.apply(Unknown
Source)

at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)

at scala.collection.Iterator$class.foreach(Iterator.scala:727)

at kafka.utils.Pool$$anon$1.foreach(Unknown Source)

at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

at kafka.utils.Pool.foreach(Unknown Source)

at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)

at kafka.consumer.ZookeeperConsumerConnector.commitOffsets(Unknown
Source)

at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(Unknown
Source)

at kafka.consumer.ZookeeperConsumerConnector.shutdown(Unknown
Source)

- locked <0xc6be0c60> (a java.lang.Object)

at
kafka.javaapi.consumer.ZookeeperConsumerConnector.shutdown(Unknown Source)
   ...

Once I set "auto.commit.enable" to "false", the problem is gone. This is
identical to what's described in
https://issues.apache.org/jira/browse/KAFKA-601, with the only difference
that it applies to Kafka 0.8 rather than Kafka 0.7.2. Any way to solve this
issue other than disabling auto-commit?

Thanks,
Yury


Loss of Leader in Kafka

2014-05-12 Thread Kashyap Mhaisekar
Hi,
I am hitting a strange exception while creating a topic in Kafka -
Steps to generate this-
1. Created a topic multipartition_test with 2 partitions and 2 replicas
2. Added some data to this topics and verified data is coming up for both
partitions
3. Deleted the topic. Checked only the zookeeper to see if the
/brokers/topics DOES NOT have the topic
4. Recreated the topic in exactly the same way as in point 1.

After this, when I list topics using ./kafka-list-topic.sh, i see that
*leader:* none and *isr:* for this topic. State change logs give the
following exception.

kafka.common.StateChangeFailedException: encountered error while electing
leader for partition [multipartition_test,1] due to: LeaderAndIsr
information doesn't exist for partition [multipartition_test,1] in
OnlinePartition state.
at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327)
at
kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
at
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109)
at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
at
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109)
at
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325)
at
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312)
at
kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376)
at
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361)
at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr
information doesn't exist for partition [multipartition_test,1] in
OnlinePartition state
at
kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347)
at
kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291)
... 11 more

Can you please help what am I doing wrong?

Regards,
kashyap


Re: question about mirror maker

2014-05-12 Thread Steven Wu
if placing mirror maker in the same datacenter as target cluster,
it/consumer will talks to zookeeper in remote/source datacenter. would it
more susceptible to network problems?

As for the problem commit offset without actually producing/writing msgs to
target cluster, it can be solved by disabling auto-commit. and only commit
msgs that are actually persisted in target cluster.

what do you think  of this opposite approach?


On Sun, May 11, 2014 at 8:48 PM, Todd Palino  wrote:

> Yes, on both counts. Putting the mirror maker in the same datacenter in
> the target cluster is exactly what we do as well. We also monitor both the
> consumer lag (by comparing the offsets stored in Zookeeper and the tail
> offset on the brokers), and the number of dropped and failed messages on
> the mirror maker producer side. The other thing to do is to make sure to
> check very carefully when you are changing anything about the producer
> configuration, to assure that you have not made a mistake.
>
> -Todd
>
> On 5/11/14, 9:12 AM, "Weide Zhang"  wrote:
>
> >Hi Todd,
> >
> >Thanks for your answer. with regard to fail over for mirror maker, does
> >that mean if i have 4 mirror maker running in different machines with same
> >consumer group, it will auto load balance if one of the mirror maker fails
> >? Also, it looks to prevent mirror maker commit wrong (consumer work but
> >not producer) due to cross data center network issue, mirror maker need to
> >be placed along with the target cluster so that this scenario is minimized
> >?
> >
> >
> >On Sat, May 10, 2014 at 11:39 PM, Todd Palino 
> >wrote:
> >
> >> Well, if you have a cluster in each datacenter, all with the same
> >>topics,
> >> you can¹t just mirror the messages between them, as you will create a
> >> loop. The way we do it is to have a ³local² cluster and an ³aggregate²
> >> cluster. The local cluster has the data for only that datacenter. Then
> >>we
> >> run mirror makers that copy the messages from each of the local clusters
> >> into the aggregate cluster. Everything produces into the local clusters,
> >> and nothing produces into the aggregate clusters. In general, consumers
> >> consume from the aggregate cluster (unless they specifically want only
> >> local data).
> >>
> >> The mirror maker is as fault tolerant as any other consumer. That is,
> >>if a
> >> mirror maker goes down, the others configured with the same consumer
> >>group
> >> (we generally run at least 4 for any mirror maker, sometimes up to 10)
> >> will rebalance and start back up from the last committed offset. What
> >>you
> >> need to watch out for is if the mirror maker is unable to produce
> >> messages, for example, if the network goes down. If it can still consume
> >> messages, but cannot produce them, you will lose messages as the
> >>consumer
> >> will continue to commit offsets with no knowledge that the producer is
> >> failing.
> >>
> >> -Todd
> >>
> >> On 5/8/14, 11:20 AM, "Weide Zhang"  wrote:
> >>
> >> >Hi,
> >> >
> >> >I have a question about mirror maker. say I have 3 data centers each
> >> >producing topic 'A' with separate kafka cluster running. if 3 of the
> >>data
> >> >need to be kept in sync with each other, shall i create 3 mirror maker
> >>in
> >> >each data center to get the data from the other two ?
> >> >
> >> >also, it mentioned that mirror making is not fault tolerant ? so what
> >>will
> >> >be the behavior of mirror consumer if it went down due to network and
> >>back
> >> >up ? do they catch up with last offset from which they last mirror ? If
> >> >so,
> >> >is it enabled by default or I have to configure  ?
> >> >
> >> >Thanks a lot,
> >> >
> >> >Weide
> >>
> >>
>
>


IndexOutOfBoundsException error

2014-05-12 Thread Xuyen On
Hi all,

I am seeing some corrupt data on my 0.7.2 Kafka cluster. Every once in awhile 
I'll get the following message:

Exception in thread "kafka-consumer" java.lang.IndexOutOfBoundsException
at java.nio.Buffer.checkIndex(Unknown Source)
at java.nio.HeapByteBuffer.get(Unknown Source)
at kafka.message.Message.magic(Message.scala:133)
at kafka.message.Message.checksum(Message.scala:146)
at kafka.message.Message.isValid(Message.scala:158)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:129)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
at 
kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
at 
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.message.MessageSet.foreach(MessageSet.scala:87)
at 
kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:97)
at 
kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:94)
at scala.collection.Iterator$class.foreach(Iterator.scala:631)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
at kafka.api.MultiFetchResponse.foreach(MultiFetchResponse.scala:25)
at 
kafka.tools.SimpleConsumerShell$$anon$1.run(SimpleConsumerShell.scala:94)

The only way I can get past this error is to move the offset past the current 
message to skip the bad data. I use the simpleConsumer.getOffsetsBefore() 
method to get a valid lists of offsets and I skip to the next valid offset. 

Has anyone encountered this issue before? I found a message where someone 
experienced similar issues and they were running JDK 1.7 and was able to get 
around it by running JDK 1.6. I am running OpenJDK 1.7 but I tried running Sun 
JDK 1.6 but I still have the same problem.

My guess is that the data is corrupt somehow but I don't know how to look at 
the raw data to confirm. Can anyone suggest ideas for me to debug this issue?

Thanks,

Xuyen




Re: question about isr

2014-05-12 Thread Ian Friedman
You're probably right that it has to be GC given the CMS activity I saw in the 
log but I didn't see a single concurrent mode failure, which bothers me that we 
had this happen anyway... also the ZK timeout is set to a very large number... 
I dunno, seems weird.

I will see what I can do about getting 0.8.1.1 deployed... how do you expect it 
to address this problem?

--Ian

On May 12, 2014, at 10:49 AM, Jun Rao  wrote:

> Could you try 0.8.1.1, which fixed some bugs related to controller hanging 
> during shutdown.
> 
> For ZK session expiration, the #1 cause is GC. We have also seen that 
> transient issues with the underlying storage (e.g. raid controller reset) can 
> also cause ZK session expiration.
> 
> As for the data loss in the producer, you are probably using ack=1, which 
> could lead to data loss during leader failover. It just that in this case, 
> the failover window is forever due to some bugs.
> 
> Thanks,
> 
> Jun
> 
> 
> On Sun, May 11, 2014 at 10:14 PM, Ian Friedman  wrote:
> Jun - We're using 0.8.1 
> The timestamps in the last few entries in controller.log seem to correspond 
> to when the trouble started in server.log
> controller.log:
> 
> [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK expired; 
> shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> 
> [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> 
> server.log:
> 
> [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] on 
> broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] from 
> 1,2,3 to 1 (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,109] ERROR Conditional update of path 
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data 
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and 
> expected version 9 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for 
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state 
> (kafka.utils.ZkUtils$)
> 
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] on 
> broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip updating 
> ISR (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on 
> broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from 
> 1,2,3 to 1 (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,111] ERROR Conditional update of path 
> /brokers/topics/callbackServiceTopic-High/partitions/3/state with data 
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and 
> expected version 9 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state 
> (kafka.utils.ZkUtils$)
> 
> I'm also concerned why this happened at all... I do see some slightly high 
> CMS activity in the GC log but I don't see any Full GCs and the whole thing 
> was over within a minute... I'm a bit concerned how something like this could 
> make a broker fail this way!
> 
> One of the after effects we found from leaving the broker running in this 
> state is that it was still allowing producers with cached metadata to produce 
> to it, so we lost a whole bunch of messages before I shut him down. What's up 
> with that??
> 
> 
> 
> --Ian
> 
> 
> 
> On Sun, May 11, 2014 at 11:39 PM, Jun Rao  wrote:
> It seems that broker 1 had a soft failure (potentially due to GC). However, 
> somehow it didn't receive the latest LeaderAndIsrRequest from the controller. 
> Which version of Kafka are you using? In the controller log, do you see 
> broker 1 being added back as a live broker?
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman  wrote:
> This seems similar to behavior we’re seeing. At some point one of our brokers 
> (id 1) just gives up and starts throwing those errors and kafka-topics no 
> longer lists it as a ISR. However the logs for that broker say something very 
> odd:
> 
> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] on 
> broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip updating 
> ISR (kafka.cluster.Partition)
> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on broker 
> 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3 to 1 
> (kafka.cluster.Partition)
> [2014-05-09 10:16:00,251] ERROR Conditional update of path 
> /brokers/to

Re: question about isr

2014-05-12 Thread Jun Rao
Could you try 0.8.1.1, which fixed some bugs related to controller hanging
during shutdown.

For ZK session expiration, the #1 cause is GC. We have also seen that
transient issues with the underlying storage (e.g. raid controller reset)
can also cause ZK session expiration.

As for the data loss in the producer, you are probably using ack=1, which
could lead to data loss during leader failover. It just that in this case,
the failover window is forever due to some bugs.

Thanks,

Jun


On Sun, May 11, 2014 at 10:14 PM, Ian Friedman  wrote:

> Jun - We're using 0.8.1
> The timestamps in the last few entries in controller.log seem to
> correspond to when the trouble started in server.log
>
> controller.log:
>
> [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK
> expired; shut down all controller components and try to re-elect
> (kafka.controller.KafkaController$SessionExpirationListener)
>
> [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>
> server.log:
>
> [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3.
> (kafka.network.Processor)
>
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2.
> (kafka.network.Processor)
>
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2.
> (kafka.network.Processor)
>
> [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27]
> on broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27]
> from 1,2,3 to 1 (kafka.cluster.Partition)
>
> [2014-05-08 19:01:30,109] ERROR Conditional update of path
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
> and expected version 9 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion for
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state
> (kafka.utils.ZkUtils$)
>
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27]
> on broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip
> updating ISR (kafka.cluster.Partition)
>
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on
> broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from
> 1,2,3 to 1 (kafka.cluster.Partition)
>
> [2014-05-08 19:01:30,111] ERROR Conditional update of path
> /brokers/topics/callbackServiceTopic-High/partitions/3/state with data
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
> and expected version 9 failed due to
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
> BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state
> (kafka.utils.ZkUtils$)
>
> I'm also concerned why this happened at all... I do see some slightly high
> CMS activity in the GC log but I don't see any Full GCs and the whole thing
> was over within a minute... I'm a bit concerned how something like this
> could make a broker fail this way!
>
> One of the after effects we found from leaving the broker running in this
> state is that it was still allowing producers with cached metadata to
> produce to it, so we lost a whole bunch of messages before I shut him down.
> What's up with that??
>
>
> --Ian
>
>
> On Sun, May 11, 2014 at 11:39 PM, Jun Rao  wrote:
>
>> It seems that broker 1 had a soft failure (potentially due to GC).
>> However, somehow it didn't receive the latest LeaderAndIsrRequest from the
>> controller. Which version of Kafka are you using? In the controller log, do
>> you see broker 1 being added back as a live broker?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman  wrote:
>>
>>> This seems similar to behavior we’re seeing. At some point one of our
>>> brokers (id 1) just gives up and starts throwing those errors and
>>> kafka-topics no longer lists it as a ISR. However the logs for that broker
>>> say something very odd:
>>>
>>> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8]
>>> on broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip
>>> updating ISR (kafka.cluster.Partition)
>>> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on
>>> broker 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3
>>> to 1 (kafka.cluster.Partition)
>>> [2014-05-09 10:16:00,251] ERROR Conditional update of path
>>> /brokers/topics/callbackServiceTopic/partitions/3/state with data
>>> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
>>> and expected version 9 failed due to
>>> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
>>> BadVersion for /brokers/topics/callbackServiceTopic/partitions/3/state
>>> (kafka.utils.ZkUtils$)
>>> [2014-05-09 10:16:00,251] INFO Partition [callbackServiceTopic,3] on
>>> broker 1: Cached zkVersion [9] not equal to that in zookeep

Re: question about isr

2014-05-12 Thread Ian Friedman
Another question - If I start broker 1 back up now, with his replicas now out 
of sync, will he automatically catch up with the leaders or do I have to do 
something to catch him up?

--Ian 

On May 12, 2014, at 10:49 AM, Jun Rao  wrote:

> Could you try 0.8.1.1, which fixed some bugs related to controller hanging 
> during shutdown.
> 
> For ZK session expiration, the #1 cause is GC. We have also seen that 
> transient issues with the underlying storage (e.g. raid controller reset) can 
> also cause ZK session expiration.
> 
> As for the data loss in the producer, you are probably using ack=1, which 
> could lead to data loss during leader failover. It just that in this case, 
> the failover window is forever due to some bugs.
> 
> Thanks,
> 
> Jun
> 
> 
> On Sun, May 11, 2014 at 10:14 PM, Ian Friedman  wrote:
> Jun - We're using 0.8.1 
> The timestamps in the last few entries in controller.log seem to correspond 
> to when the trouble started in server.log
> controller.log:
> 
> [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK expired; 
> shut down all controller components and try to re-elect 
> (kafka.controller.KafkaController$SessionExpirationListener)
> 
> [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down 
> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
> 
> server.log:
> 
> [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2. 
> (kafka.network.Processor)
> 
> [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27] on 
> broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27] from 
> 1,2,3 to 1 (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,109] ERROR Conditional update of path 
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data 
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and 
> expected version 9 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for 
> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state 
> (kafka.utils.ZkUtils$)
> 
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27] on 
> broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip updating 
> ISR (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on 
> broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from 
> 1,2,3 to 1 (kafka.cluster.Partition)
> 
> [2014-05-08 19:01:30,111] ERROR Conditional update of path 
> /brokers/topics/callbackServiceTopic-High/partitions/3/state with data 
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and 
> expected version 9 failed due to 
> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = 
> BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state 
> (kafka.utils.ZkUtils$)
> 
> I'm also concerned why this happened at all... I do see some slightly high 
> CMS activity in the GC log but I don't see any Full GCs and the whole thing 
> was over within a minute... I'm a bit concerned how something like this could 
> make a broker fail this way!
> 
> One of the after effects we found from leaving the broker running in this 
> state is that it was still allowing producers with cached metadata to produce 
> to it, so we lost a whole bunch of messages before I shut him down. What's up 
> with that??
> 
> 
> 
> --Ian
> 
> 
> 
> On Sun, May 11, 2014 at 11:39 PM, Jun Rao  wrote:
> It seems that broker 1 had a soft failure (potentially due to GC). However, 
> somehow it didn't receive the latest LeaderAndIsrRequest from the controller. 
> Which version of Kafka are you using? In the controller log, do you see 
> broker 1 being added back as a live broker?
> 
> Thanks,
> 
> Jun
> 
> 
> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman  wrote:
> This seems similar to behavior we’re seeing. At some point one of our brokers 
> (id 1) just gives up and starts throwing those errors and kafka-topics no 
> longer lists it as a ISR. However the logs for that broker say something very 
> odd:
> 
> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8] on 
> broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip updating 
> ISR (kafka.cluster.Partition)
> [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic,3] on broker 
> 1: Shrinking ISR for partition [callbackServiceTopic,3] from 1,2,3 to 1 
> (kafka.cluster.Partition)
> [2014-05-09 10:16:00,251] ERROR Conditional update of path 
> /brokers/topics/callbackServiceTopic/partitions/3/state with data 
> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]} and 
> expected version 9 failed due to 
> org.apache.z

Re: question about isr

2014-05-12 Thread Jun Rao
0.8.1.1 fixed an issue when the controller could hang during soft failure.

Out-of sync replicas will catch up from the leader automatically, once
restarted.

Thanks,

Jun


On Mon, May 12, 2014 at 12:37 PM, Ian Friedman  wrote:

> You're probably right that it has to be GC given the CMS activity I saw in
> the log but I didn't see a single concurrent mode failure, which bothers me
> that we had this happen anyway... also the ZK timeout is set to a very
> large number... I dunno, seems weird.
>
> I will see what I can do about getting 0.8.1.1 deployed... how do you
> expect it to address this problem?
>
> --Ian
>
>
> On May 12, 2014, at 10:49 AM, Jun Rao  wrote:
>
> Could you try 0.8.1.1, which fixed some bugs related to controller hanging
> during shutdown.
>
> For ZK session expiration, the #1 cause is GC. We have also seen that
> transient issues with the underlying storage (e.g. raid controller reset)
> can also cause ZK session expiration.
>
> As for the data loss in the producer, you are probably using ack=1, which
> could lead to data loss during leader failover. It just that in this case,
> the failover window is forever due to some bugs.
>
> Thanks,
>
> Jun
>
>
> On Sun, May 11, 2014 at 10:14 PM, Ian Friedman  wrote:
>
>> Jun - We're using 0.8.1
>> The timestamps in the last few entries in controller.log seem to
>> correspond to when the trouble started in server.log
>>
>> controller.log:
>>
>> [2014-05-08 19:01:27,693] INFO [SessionExpirationListener on 1], ZK
>> expired; shut down all controller components and try to re-elect
>> (kafka.controller.KafkaController$SessionExpirationListener)
>>
>> [2014-05-08 19:01:27,694] INFO [delete-topics-thread], Shutting down
>> (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
>>
>> server.log:
>>
>> [2014-05-08 19:01:27,382] INFO Closing socket connection to /10.10.13.3.
>> (kafka.network.Processor)
>>
>> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2.
>> (kafka.network.Processor)
>>
>> [2014-05-08 19:01:27,385] INFO Closing socket connection to /10.10.57.2.
>> (kafka.network.Processor)
>>
>> [2014-05-08 19:01:29,886] INFO Partition [callbackServiceTopic-Medium,27]
>> on broker 1: Shrinking ISR for partition [callbackServiceTopic-Medium,27]
>> from 1,2,3 to 1 (kafka.cluster.Partition)
>>
>> [2014-05-08 19:01:30,109] ERROR Conditional update of path
>> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state with data
>> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
>> and expected version 9 failed due to
>> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
>> BadVersion for
>> /brokers/topics/callbackServiceTopic-Medium/partitions/27/state
>> (kafka.utils.ZkUtils$)
>>
>> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-Medium,27]
>> on broker 1: Cached zkVersion [9] not equal to that in zookeeper, skip
>> updating ISR (kafka.cluster.Partition)
>>
>> [2014-05-08 19:01:30,109] INFO Partition [callbackServiceTopic-High,3] on
>> broker 1: Shrinking ISR for partition [callbackServiceTopic-High,3] from
>> 1,2,3 to 1 (kafka.cluster.Partition)
>>
>> [2014-05-08 19:01:30,111] ERROR Conditional update of path
>> /brokers/topics/callbackServiceTopic-High/partitions/3/state with data
>> {"controller_epoch":4,"leader":1,"version":1,"leader_epoch":4,"isr":[1]}
>> and expected version 9 failed due to
>> org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode =
>> BadVersion for /brokers/topics/callbackServiceTopic-High/partitions/3/state
>> (kafka.utils.ZkUtils$)
>>
>> I'm also concerned why this happened at all... I do see some slightly
>> high CMS activity in the GC log but I don't see any Full GCs and the whole
>> thing was over within a minute... I'm a bit concerned how something like
>> this could make a broker fail this way!
>>
>> One of the after effects we found from leaving the broker running in this
>> state is that it was still allowing producers with cached metadata to
>> produce to it, so we lost a whole bunch of messages before I shut him down.
>> What's up with that??
>>
>>
>> --Ian
>>
>>
>> On Sun, May 11, 2014 at 11:39 PM, Jun Rao  wrote:
>>
>>> It seems that broker 1 had a soft failure (potentially due to GC).
>>> However, somehow it didn't receive the latest LeaderAndIsrRequest from the
>>> controller. Which version of Kafka are you using? In the controller log, do
>>> you see broker 1 being added back as a live broker?
>>>
>>> Thanks,
>>>
>>> Jun
>>>
>>>
>>> On Fri, May 9, 2014 at 10:21 AM, Ian Friedman  wrote:
>>>
 This seems similar to behavior we’re seeing. At some point one of our
 brokers (id 1) just gives up and starts throwing those errors and
 kafka-topics no longer lists it as a ISR. However the logs for that broker
 say something very odd:

 [2014-05-09 10:16:00,248] INFO Partition [callbackServiceTopic-High,8]
 on broker 1: Cached zkVersion [10] not equal to that in zookeeper, skip
 upd

Kafka producer in CSharp

2014-05-12 Thread Margusja

Hi

I have kafka broker running (kafka_2.9.1-0.8.1.1)
All is working.

One project requires producer is written in CSharp
I am not dot net programmer but I managed to write simple producer code 
using 
https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md


the code
...
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kafka.Client;

namespace DemoProducer
{
class Program
{
static void Main(string[] args)
{
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
Message msg1 = new Message(payloadData1);

string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
Message msg2 = new Message(payloadData2);

Producer producer = new Producer("broker", 9092);
producer.Send("kafkademo3", 0 ,  msg1 );
}
}
}
...

In broker side I am getting the error if I executing the code above:

[2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because 
of error (kafka.network.Processor)

java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
at java.nio.ByteBuffer.get(ByteBuffer.java:694)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at 
kafka.network.RequestChannel$Request.(RequestChannel.scala:53)

at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)



[2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 
because of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)

I suspected that the problem is in the broker version 
(kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.

Now I was able to send messages using CSharp code.

So is there workaround how I can use latest kafka version and CSharp ? 
Or What is the latest kafka version supporting CSharp producer?


--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"



Re: IndexOutOfBoundsException error

2014-05-12 Thread Jun Rao
Could you run the DumpLogSegments tool to see if there are indeed corrupted
messages in the broker log?

Thanks,

Jun


On Mon, May 12, 2014 at 4:08 PM, Xuyen On  wrote:

> Hi all,
>
> I am seeing some corrupt data on my 0.7.2 Kafka cluster. Every once in
> awhile I'll get the following message:
>
> Exception in thread "kafka-consumer" java.lang.IndexOutOfBoundsException
> at java.nio.Buffer.checkIndex(Unknown Source)
> at java.nio.HeapByteBuffer.get(Unknown Source)
> at kafka.message.Message.magic(Message.scala:133)
> at kafka.message.Message.checksum(Message.scala:146)
> at kafka.message.Message.isValid(Message.scala:158)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNextOuter(ByteBufferMessageSet.scala:129)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:166)
> at
> kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:100)
> at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at kafka.message.MessageSet.foreach(MessageSet.scala:87)
> at
> kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:97)
> at
> kafka.tools.SimpleConsumerShell$$anon$1$$anonfun$run$1.apply(SimpleConsumerShell.scala:94)
> at scala.collection.Iterator$class.foreach(Iterator.scala:631)
> at kafka.utils.IteratorTemplate.foreach(IteratorTemplate.scala:30)
> at
> scala.collection.IterableLike$class.foreach(IterableLike.scala:79)
> at
> kafka.api.MultiFetchResponse.foreach(MultiFetchResponse.scala:25)
> at
> kafka.tools.SimpleConsumerShell$$anon$1.run(SimpleConsumerShell.scala:94)
>
> The only way I can get past this error is to move the offset past the
> current message to skip the bad data. I use the
> simpleConsumer.getOffsetsBefore() method to get a valid lists of offsets
> and I skip to the next valid offset.
>
> Has anyone encountered this issue before? I found a message where someone
> experienced similar issues and they were running JDK 1.7 and was able to
> get around it by running JDK 1.6. I am running OpenJDK 1.7 but I tried
> running Sun JDK 1.6 but I still have the same problem.
>
> My guess is that the data is corrupt somehow but I don't know how to look
> at the raw data to confirm. Can anyone suggest ideas for me to debug this
> issue?
>
> Thanks,
>
> Xuyen
>
>
>


Re: kafka broker failed to recovery from ZK failure

2014-05-12 Thread Jun Rao
The controller log in broker 1 is too late. Could you send its log
around 2014-05-12 21:24:37?

Thanks,

Jun


On Mon, May 12, 2014 at 5:02 PM, Steven Wu  wrote:

> This is a three-node cluster. broker 0 lost connection to ZK. broker 1
> does seem to take the controller role. but broker 0 stuck in the bad state
> and wasn't able to recover.
>
> it seems to start with these error msgs. I have attached complete
> controller and server log for broker 0 and 1.
>
> I am using "kafka_2.9.2-0.8.1.1.jar".
>
> Thanks,
> Steven
>
> [2014-05-12 21:24:28,737] INFO Client session timed out, have not heard
> from server in 4000ms for sessionid 0xb145a9585a806013, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:28,838] INFO zookeeper state changed (Disconnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-05-12 21:24:29,360] INFO Opening socket connection to server
> ip-10-84-58-49.ec2.internal/10.84.58.49:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:30,562] INFO Client session timed out, have not heard
> from server in 1724ms for sessionid 0xb145a9585a806013, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:30,706] INFO Opening socket connection to server
> ip-10-45-171-150.ec2.internal/10.45.171.150:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:31,907] INFO Client session timed out, have not heard
> from server in 1245ms for sessionid 0xb145a9585a806013, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:32,927] INFO Opening socket connection to server
> ip-10-45-172-197.ec2.internal/10.45.172.197:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:34,128] INFO Client session timed out, have not heard
> from server in 2120ms for sessionid 0xb145a9585a806013, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:34,380] INFO Opening socket connection to server
> ip-10-33-144-217.ec2.internal/10.33.144.217:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:35,581] INFO Client session timed out, have not heard
> from server in 1353ms for sessionid 0xb145a9585a806013, closing socket
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:36,033] INFO Closing socket connection to /10.60.94.175.
> (kafka.network.Processor)
> [2014-05-12 21:24:37,282] INFO Opening socket connection to server
> ip-10-78-235-244.ec2.internal/10.78.235.244:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,284] INFO Socket connection established to
> ip-10-78-235-244.ec2.internal/10.78.235.244:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,288] INFO zookeeper state changed (Expired)
> (org.I0Itec.zkclient.ZkClient)
> [2014-05-12 21:24:37,288] INFO Unable to reconnect to ZooKeeper service,
> session 0xb145a9585a806013 has expired, closing socket connection
> (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,288] INFO Initiating client connection, connectString=
> ec2-50-19-255-1.compute-1.amazonaws.com:2181,
> ec2-54-235-159-245.compute-1.amazonaws.com:2181,
> ec2-50-19-255-97.compute-1.amazonaws.com:2181,
> ec2-184-73-152-248.compute-1.amazonaws.com:2181,
> ec2-50-17-247-179.compute-1.amazonaws.com:2181/kafkabroker/defaultsessionTimeout=6000
>  watcher=org.I0Itec.zkclient.ZkClient@6f2a4d2f(org.apache.zookeeper.ZooKeeper)
> [2014-05-12 21:24:37,305] INFO EventThread shut down
> (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,311] INFO Opening socket connection to server
> ip-10-171-10-136.ec2.internal/10.171.10.136:2181. Will not attempt to
> authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,311] INFO Socket connection established to
> ip-10-171-10-136.ec2.internal/10.171.10.136:2181, initiating session
> (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,316] INFO Session establishment complete on server
> ip-10-171-10-136.ec2.internal/10.171.10.136:2181, sessionid =
> 0xb445f1c68e5c013d, negotiated timeout = 6000
> (org.apache.zookeeper.ClientCnxn)
> [2014-05-12 21:24:37,316] INFO zookeeper state changed (SyncConnected)
> (org.I0Itec.zkclient.ZkClient)
> [2014-05-12 21:24:37,335] INFO conflict in /controller data:
> {"version":1,"brokerid":0,"timestamp":"1399929877328"} stored data:
> {"version":1,"brokerid":1,"timestamp":"1399321265926"} (kafka.utils.:
> [2014-05-12 21:24:37,348] INFO re-registering broker info in ZK for broker
> 0 (kafka.server.KafkaHealthcheck)
> [2014-05-12 21:24:37,352] INFO Registered broker 0 at path /

Re: Loss of Leader in Kafka

2014-05-12 Thread Jun Rao
Delete topic doesn't quite work in 0.8.1. We recently fixed it in trunk.
Could you give it a try and see if you see the same issue?

Thanks,

Jun


On Mon, May 12, 2014 at 9:39 AM, Kashyap Mhaisekar wrote:

> Hi,
> I am hitting a strange exception while creating a topic in Kafka -
> Steps to generate this-
> 1. Created a topic multipartition_test with 2 partitions and 2 replicas
> 2. Added some data to this topics and verified data is coming up for both
> partitions
> 3. Deleted the topic. Checked only the zookeeper to see if the
> /brokers/topics DOES NOT have the topic
> 4. Recreated the topic in exactly the same way as in point 1.
>
> After this, when I list topics using ./kafka-list-topic.sh, i see that
> *leader:* none and *isr:* for this topic. State change logs give the
> following exception.
>
> kafka.common.StateChangeFailedException: encountered error while electing
> leader for partition [multipartition_test,1] due to: LeaderAndIsr
> information doesn't exist for partition [multipartition_test,1] in
> OnlinePartition state.
> at
>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:327)
> at
>
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:154)
> at
>
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:110)
> at
>
> kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$2.apply(PartitionStateMachine.scala:109)
> at scala.collection.immutable.Set$Set2.foreach(Set.scala:101)
> at
>
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:109)
> at
>
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:325)
> at
>
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:312)
> at
>
> kafka.controller.PartitionStateMachine$TopicChangeListener.liftedTree1$1(PartitionStateMachine.scala:376)
> at
>
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:361)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> Caused by: kafka.common.StateChangeFailedException: LeaderAndIsr
> information doesn't exist for partition [multipartition_test,1] in
> OnlinePartition state
> at
>
> kafka.controller.PartitionStateMachine.getLeaderIsrAndEpochOrThrowException(PartitionStateMachine.scala:347)
> at
>
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:291)
> ... 11 more
>
> Can you please help what am I doing wrong?
>
> Regards,
> kashyap
>


Re: question about mirror maker

2014-05-12 Thread Todd Palino
As far as Zookeeper goes, any time you have network communication you have
the change of a problem. I would rather have the network issue on the
consumer side, rather than the producer side.

I would certainly prefer to have the offsets committed only after the
message is produced (based on the acks setting for the producer). The
problem with this is that the consumer and the producer in mirror maker
are separated by a queue, and there would need to be a significant amount
of communicated added between the two for that to work, but without a loss
of throughput.

-Todd

On 5/12/14, 11:26 AM, "Steven Wu"  wrote:

>if placing mirror maker in the same datacenter as target cluster,
>it/consumer will talks to zookeeper in remote/source datacenter. would it
>more susceptible to network problems?
>
>As for the problem commit offset without actually producing/writing msgs
>to
>target cluster, it can be solved by disabling auto-commit. and only commit
>msgs that are actually persisted in target cluster.
>
>what do you think  of this opposite approach?
>
>
>On Sun, May 11, 2014 at 8:48 PM, Todd Palino  wrote:
>
>> Yes, on both counts. Putting the mirror maker in the same datacenter in
>> the target cluster is exactly what we do as well. We also monitor both
>>the
>> consumer lag (by comparing the offsets stored in Zookeeper and the tail
>> offset on the brokers), and the number of dropped and failed messages on
>> the mirror maker producer side. The other thing to do is to make sure to
>> check very carefully when you are changing anything about the producer
>> configuration, to assure that you have not made a mistake.
>>
>> -Todd
>>
>> On 5/11/14, 9:12 AM, "Weide Zhang"  wrote:
>>
>> >Hi Todd,
>> >
>> >Thanks for your answer. with regard to fail over for mirror maker, does
>> >that mean if i have 4 mirror maker running in different machines with
>>same
>> >consumer group, it will auto load balance if one of the mirror maker
>>fails
>> >? Also, it looks to prevent mirror maker commit wrong (consumer work
>>but
>> >not producer) due to cross data center network issue, mirror maker
>>need to
>> >be placed along with the target cluster so that this scenario is
>>minimized
>> >?
>> >
>> >
>> >On Sat, May 10, 2014 at 11:39 PM, Todd Palino 
>> >wrote:
>> >
>> >> Well, if you have a cluster in each datacenter, all with the same
>> >>topics,
>> >> you can¹t just mirror the messages between them, as you will create a
>> >> loop. The way we do it is to have a ³local² cluster and an
>>³aggregate²
>> >> cluster. The local cluster has the data for only that datacenter.
>>Then
>> >>we
>> >> run mirror makers that copy the messages from each of the local
>>clusters
>> >> into the aggregate cluster. Everything produces into the local
>>clusters,
>> >> and nothing produces into the aggregate clusters. In general,
>>consumers
>> >> consume from the aggregate cluster (unless they specifically want
>>only
>> >> local data).
>> >>
>> >> The mirror maker is as fault tolerant as any other consumer. That is,
>> >>if a
>> >> mirror maker goes down, the others configured with the same consumer
>> >>group
>> >> (we generally run at least 4 for any mirror maker, sometimes up to
>>10)
>> >> will rebalance and start back up from the last committed offset. What
>> >>you
>> >> need to watch out for is if the mirror maker is unable to produce
>> >> messages, for example, if the network goes down. If it can still
>>consume
>> >> messages, but cannot produce them, you will lose messages as the
>> >>consumer
>> >> will continue to commit offsets with no knowledge that the producer
>>is
>> >> failing.
>> >>
>> >> -Todd
>> >>
>> >> On 5/8/14, 11:20 AM, "Weide Zhang"  wrote:
>> >>
>> >> >Hi,
>> >> >
>> >> >I have a question about mirror maker. say I have 3 data centers each
>> >> >producing topic 'A' with separate kafka cluster running. if 3 of the
>> >>data
>> >> >need to be kept in sync with each other, shall i create 3 mirror
>>maker
>> >>in
>> >> >each data center to get the data from the other two ?
>> >> >
>> >> >also, it mentioned that mirror making is not fault tolerant ? so
>>what
>> >>will
>> >> >be the behavior of mirror consumer if it went down due to network
>>and
>> >>back
>> >> >up ? do they catch up with last offset from which they last mirror
>>? If
>> >> >so,
>> >> >is it enabled by default or I have to configure  ?
>> >> >
>> >> >Thanks a lot,
>> >> >
>> >> >Weide
>> >>
>> >>
>>
>>



CSharp librari and Producer Closing socket for because of error (kafka.network.Processor),java.nio.BufferUnderflowException

2014-05-12 Thread Margusja

Hi

I have kafka broker running (kafka_2.9.1-0.8.1.1)
All is working.

One project requires producer is written in CSharp
I am not dot net programmer but I managed to write simple producer code 
using 
https://github.com/kafka-dev/kafka/blob/master/clients/csharp/README.md


the code
...
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Kafka.Client;

namespace DemoProducer
{
class Program
{
static void Main(string[] args)
{
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
Message msg1 = new Message(payloadData1);

string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
Message msg2 = new Message(payloadData2);

Producer producer = new Producer("broker", 9092);
producer.Send("kafkademo3", 0 ,  msg1 );
}
}
}
...

In broker side I am getting the error if I executing the code above:

[2014-05-12 19:15:58,984] ERROR Closing socket for /84.50.21.39 because 
of error (kafka.network.Processor)

java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:145)
at java.nio.ByteBuffer.get(ByteBuffer.java:694)
at kafka.api.ApiUtils$.readShortString(ApiUtils.scala:38)
at kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:33)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36)
at 
kafka.network.RequestChannel$Request.(RequestChannel.scala:53)

at kafka.network.Processor.read(SocketServer.scala:353)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)



[2014-05-12 19:16:11,836] ERROR Closing socket for /90.190.106.56 
because of error (kafka.network.Processor)

java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at kafka.utils.Utils$.read(Utils.scala:375)
at 
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)

at kafka.network.Processor.read(SocketServer.scala:347)
at kafka.network.Processor.run(SocketServer.scala:245)
at java.lang.Thread.run(Thread.java:744)

I suspected that the problem is in the broker version 
(kafka_2.9.1-0.8.1.1) so I downloaded kafka-0.7.1-incubating.

Now I was able to send messages using CSharp code.

So is there workaround how I can use latest kafka version and CSharp ? 
Or What is the latest kafka version supporting CSharp producer?


And one more question. In Csharp lib how can I give to producer brokers 
list to get fault tolerance in case one broker is down?


--
Best regards, Margus (Margusja) Roo
+372 51 48 780
http://margus.roo.ee
http://ee.linkedin.com/in/margusroo
skype: margusja
ldapsearch -x -h ldap.sk.ee -b c=EE "(serialNumber=37303140314)"