Thanks Jason. I'll try to upgrade and see if it helps. On Mon, Mar 14, 2016 at 12:04 PM, Jason Gustafson <ja...@confluent.io> wrote:
> I think this is the one: https://issues.apache.org/jira/browse/KAFKA-2978. > > -Jason > > On Mon, Mar 14, 2016 at 11:54 AM, Rajiv Kurian <ra...@signalfx.com> wrote: > > > @Jason, can you please point me to the bug that you were talking about in > > 0.9.0.0? > > > > On Mon, Mar 14, 2016 at 11:36 AM, Rajiv Kurian <ra...@signalfx.com> > wrote: > > > > > No I haven't. It's still running the 0.9.0 client. I'll try upgrading > if > > > it sounds like an old bug. > > > > > > On Mon, Mar 14, 2016 at 11:24 AM, Jason Gustafson <ja...@confluent.io> > > > wrote: > > > > > >> Hey Rajiv, > > >> > > >> That sounds suspiciously like one of the bugs from 0.9.0.0. Have you > > >> updated kafka-clients to 0.9.0.1? > > >> > > >> -Jason > > >> > > >> On Mon, Mar 14, 2016 at 11:18 AM, Rajiv Kurian <ra...@signalfx.com> > > >> wrote: > > >> > > >> > Has any one run into similar problems. I have experienced the same > > >> problem > > >> > again. This time when I use kafka-consumer-groups.sh tool it says > that > > >> my > > >> > consumer group is either missing or rebalancing. But when I use the > > >> --list > > >> > method it shows up on the list. So my guess is it is rebalancing > some > > >> how. > > >> > Again I have a single consumer group per topic with a single > consumer > > in > > >> > that group. Wondering it this causes some edge case. This consumer > is > > >> up as > > >> > of now, so I don't know why it would say it is rebalancing. > > >> > > > >> > On Wed, Mar 9, 2016 at 11:05 PM, Rajiv Kurian <ra...@signalfx.com> > > >> wrote: > > >> > > > >> > > Thanks! That worked. So I see that for the host that I am not > > getting > > >> > > messages for has a massive lag for the 0th partition (the only > one I > > >> send > > >> > > messages on). The other 19 groups are all caught up which explains > > why > > >> > they > > >> > > have no issues. The lag is just increasing with time which > confirms > > my > > >> > > suspicion that no messages are being sent to it. However the owner > > of > > >> the > > >> > > consumer is correctly shown to be the right process > > >> > > (consumer-1_/ip_of_my_consumer). > > >> > > > > >> > > I know that I am calling poll regularly since I record metrics > when > > I > > >> > make > > >> > > the poll call. The fact that Kafka consumer's JMX metrics also > show > > 5 > > >> > > responses a second probably proves that the poll call is being > made. > > >> I am > > >> > > guessing these poll calls yield empty responses and hence my > > >> application > > >> > > sees no messages. > > >> > > > > >> > > Is there some known bug where a single consumer group with a > single > > >> > > consumer can run into such a problem? What happens if a consumer > > group > > >> > has > > >> > > a single consumer and it misses it's heart beat? Can it get stuck > in > > >> > limbo > > >> > > in such a condition? I am guessing the problem will go away if I > > >> restart > > >> > my > > >> > > consumer but I want to try to figure out why it happened and how I > > can > > >> > > prevent it. > > >> > > > > >> > > I did another experiment. I started sending data on all 8 > partitions > > >> > > instead of the 0th partition only. Now I see that the lag for > those > > >> > other 7 > > >> > > partitions is 0 i.e. they are all caught up. However the 0th > > partition > > >> > > which is still getting some traffic has the consumer offset at the > > >> same > > >> > > number always and hence it's lag increasing. It must be in some > kind > > >> of > > >> > > limbo that the other partitions are not affected by. > > >> > > > > >> > > Thanks, > > >> > > Rajiv > > >> > > > > >> > > On Wed, Mar 9, 2016 at 10:46 PM, Manikumar Reddy < > > >> > > manikumar.re...@gmail.com> wrote: > > >> > > > > >> > >> We need to pass "--new-consumer" property to > > kafka-consumer-groups.sh > > >> > >> command to use new consumer. > > >> > >> > > >> > >> sh kafka-consumer-groups.sh --bootstrap-server localhost:9092 > > --list > > >> > >> --new-consumer > > >> > >> > > >> > >> > > >> > >> On Thu, Mar 10, 2016 at 12:02 PM, Rajiv Kurian < > ra...@signalfx.com > > > > > >> > >> wrote: > > >> > >> > > >> > >> > Hi Guozhang, > > >> > >> > > > >> > >> > I tried using the kafka-consumer-groups.sh --list command and > it > > >> says > > >> > I > > >> > >> > have no consumer groups set up at all. Yet I am receiving data > on > > >> 19 > > >> > >> out of > > >> > >> > 20 consumer processes (each with their own topic and consumer > > >> group). > > >> > >> > > > >> > >> > Here is my full kafka config as printed when my process started > > up: > > >> > >> > > > >> > >> > metric.reporters = [] > > >> > >> > > > >> > >> > metadata.max.age.ms = 300000 > > >> > >> > > > >> > >> > value.deserializer = class > > >> > >> > sf.org.apache.kafka9.common.serialization.ByteArrayDeserializer > > >> > >> > > > >> > >> > group.id = myTopic_consumer > > >> > >> > > > >> > >> > partition.assignment.strategy = > > >> > >> > [sf.org.apache.kafka9.clients.consumer.RangeAssignor] > > >> > >> > > > >> > >> > reconnect.backoff.ms = 50 > > >> > >> > > > >> > >> > sasl.kerberos.ticket.renew.window.factor = 0.8 > > >> > >> > > > >> > >> > max.partition.fetch.bytes = 1048576 > > >> > >> > > > >> > >> > bootstrap.servers = [myBroker1:9092, myBroker2:9092, > > >> > >> > myBroker3:9092] > > >> > >> > > > >> > >> > retry.backoff.ms = 100 > > >> > >> > > > >> > >> > sasl.kerberos.kinit.cmd = /usr/bin/kinit > > >> > >> > > > >> > >> > sasl.kerberos.service.name = null > > >> > >> > > > >> > >> > sasl.kerberos.ticket.renew.jitter = 0.05 > > >> > >> > > > >> > >> > ssl.keystore.type = JKS > > >> > >> > > > >> > >> > ssl.trustmanager.algorithm = PKIX > > >> > >> > > > >> > >> > enable.auto.commit = false > > >> > >> > > > >> > >> > ssl.key.password = null > > >> > >> > > > >> > >> > fetch.max.wait.ms = 1000 > > >> > >> > > > >> > >> > sasl.kerberos.min.time.before.relogin = 60000 > > >> > >> > > > >> > >> > connections.max.idle.ms = 540000 > > >> > >> > > > >> > >> > ssl.truststore.password = null > > >> > >> > > > >> > >> > session.timeout.ms = 30000 > > >> > >> > > > >> > >> > metrics.num.samples = 2 > > >> > >> > > > >> > >> > client.id = > > >> > >> > > > >> > >> > ssl.endpoint.identification.algorithm = null > > >> > >> > > > >> > >> > key.deserializer = class > sf.disco.kafka.VoidDeserializer > > >> > >> > > > >> > >> > ssl.protocol = TLS > > >> > >> > > > >> > >> > check.crcs = true > > >> > >> > > > >> > >> > request.timeout.ms = 40000 > > >> > >> > > > >> > >> > ssl.provider = null > > >> > >> > > > >> > >> > ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] > > >> > >> > > > >> > >> > ssl.keystore.location = null > > >> > >> > > > >> > >> > heartbeat.interval.ms = 3000 > > >> > >> > > > >> > >> > auto.commit.interval.ms = 5000 > > >> > >> > > > >> > >> > receive.buffer.bytes = 32768 > > >> > >> > > > >> > >> > ssl.cipher.suites = null > > >> > >> > > > >> > >> > ssl.truststore.type = JKS > > >> > >> > > > >> > >> > security.protocol = PLAINTEXT > > >> > >> > > > >> > >> > ssl.truststore.location = null > > >> > >> > > > >> > >> > ssl.keystore.password = null > > >> > >> > > > >> > >> > ssl.keymanager.algorithm = SunX509 > > >> > >> > > > >> > >> > metrics.sample.window.ms = 30000 > > >> > >> > > > >> > >> > fetch.min.bytes = 256 > > >> > >> > > > >> > >> > send.buffer.bytes = 131072 > > >> > >> > > > >> > >> > auto.offset.reset = earliest > > >> > >> > > > >> > >> > It prints out the group.id field as myTopic_consumer. I was > > >> expecting > > >> > >> to > > >> > >> > get this in the --list command and yet I am not getting it. Is > > this > > >> > the > > >> > >> > name of the consumer group or am I missing something? > > >> > >> > > > >> > >> > I use the subscribe call on the consumer and my understanding > was > > >> that > > >> > >> the > > >> > >> > subscribe call would do all the work needed to create/join a > > group. > > >> > >> Given I > > >> > >> > have a single consumer per group and a single group per topic > I'd > > >> > >> expect to > > >> > >> > see 20 groups (1 for each of my topics). However the --list > > >> returns no > > >> > >> > groups at all! > > >> > >> > > > >> > >> > Thanks, > > >> > >> > Rajiv > > >> > >> > > > >> > >> > On Wed, Mar 9, 2016 at 8:22 PM, Guozhang Wang < > > wangg...@gmail.com> > > >> > >> wrote: > > >> > >> > > > >> > >> > > Rajiv, > > >> > >> > > > > >> > >> > > In the new Java consumer you used, the ZK dependency has been > > >> > removed > > >> > >> and > > >> > >> > > hence you wont see any data from ZK path. > > >> > >> > > > > >> > >> > > To check the group metadata you can use the > > ConsumerGroupCommand, > > >> > >> wrapped > > >> > >> > > in bin/kafka-consumer-groups.sh. > > >> > >> > > > > >> > >> > > Guozhang > > >> > >> > > > > >> > >> > > On Wed, Mar 9, 2016 at 5:48 PM, Rajiv Kurian < > > ra...@signalfx.com > > >> > > > >> > >> wrote: > > >> > >> > > > > >> > >> > > > Don't think I made my questions clear: > > >> > >> > > > > > >> > >> > > > On Kafka 0.9.0.1 broker and 0.9 consumer how do I tell what > > my > > >> > >> > > > consumer-groups are? Can I still get this information in > ZK? > > I > > >> > don't > > >> > >> > see > > >> > >> > > > anything in the consumers folder which is alarming to me. > > This > > >> is > > >> > >> > > > especially alarming because I do see that 8 partitions are > > >> > assigned > > >> > >> on > > >> > >> > > the > > >> > >> > > > consumer (via jmx). I specify the consumer group using: > > >> > >> > > > > > >> > >> > > > String myConsumerGroupId = myTopic + "_consumer"; > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > >> > > > props.put(org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG, > > >> > >> > > > myConsumerGroupId); > > >> > >> > > > > > >> > >> > > > I am running with this setup on about 20 consumers (each > > >> > consuming a > > >> > >> > > unique > > >> > >> > > > topic) and I only see one of my consumers not passing any > > >> messages > > >> > >> to > > >> > >> > my > > >> > >> > > > application even though I see that the jmx console says it > is > > >> > >> > receiving 5 > > >> > >> > > > requests per second. The other 19 seem to be working fine. > > >> > >> > > > > > >> > >> > > > Each of these 20 topics was created when a message was sent > > to > > >> it > > >> > >> i.e. > > >> > >> > it > > >> > >> > > > was not provisioned from before. Messages currently are > only > > >> being > > >> > >> sent > > >> > >> > > to > > >> > >> > > > partition 0 even though there are 8 partitions per topic. > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > Thanks, > > >> > >> > > > > > >> > >> > > > Rajiv > > >> > >> > > > > > >> > >> > > > On Wed, Mar 9, 2016 at 4:30 PM, Rajiv Kurian < > > >> ra...@signalfx.com> > > >> > >> > wrote: > > >> > >> > > > > > >> > >> > > > > Also forgot to mention that when I do consume with the > > >> console > > >> > >> > > consumer I > > >> > >> > > > > do see data coming through. > > >> > >> > > > > > > >> > >> > > > > On Wed, Mar 9, 2016 at 3:44 PM, Rajiv Kurian < > > >> > ra...@signalfx.com> > > >> > >> > > wrote: > > >> > >> > > > > > > >> > >> > > > >> I am running the 0.9.0.1 broker with the 0.9 consumer. I > > am > > >> > using > > >> > >> > the > > >> > >> > > > >> subscribe feature on the consumer to subscribe to a > topic > > >> with > > >> > 8 > > >> > >> > > > partitions. > > >> > >> > > > >> > > >> > >> > > > >> consumer.subscribe(Arrays.asList(myTopic)); > > >> > >> > > > >> > > >> > >> > > > >> I have a single consumer group for said topic and a > single > > >> > >> process > > >> > >> > > > >> subscribed with 8 partitions. > > >> > >> > > > >> > > >> > >> > > > >> When I use jmx on the consumer I do see that it has 8 > > >> > partitions > > >> > >> > > > assigned > > >> > >> > > > >> to it according to the consumer-coordinator-metrics > mbean. > > >> How > > >> > >> can I > > >> > >> > > > tell > > >> > >> > > > >> what topic it is listening to? I couldn't find this on > jmx > > >> > >> > anywhere. I > > >> > >> > > > do > > >> > >> > > > >> see that it is getting 5 responses per second according > to > > >> the > > >> > >> > > > >> consumer-metrics mbean but I don't see any in my actual > > >> > >> application. > > >> > >> > > > >> > > >> > >> > > > >> I consume my messages like this: > > >> > >> > > > >> > > >> > >> > > > >> public int poll(SubscriptionDataHandler handler, > long > > >> > >> timeout) { > > >> > >> > > > >> > > >> > >> > > > >> ConsumerRecords<Void, byte[]> records = null; > > >> > >> > > > >> > > >> > >> > > > >> try { > > >> > >> > > > >> > > >> > >> > > > >> records = consumer.poll(timeout); > > >> > >> > > > >> > > >> > >> > > > >> } catch (Exception e) { > > >> > >> > > > >> > > >> > >> > > > >> logger.error("Exception polling the Kafka , > > >> e); // > > >> > >> > Don't > > >> > >> > > > >> see any exceptions here > > >> > >> > > > >> > > >> > >> > > > >> return -1; > > >> > >> > > > >> > > >> > >> > > > >> } > > >> > >> > > > >> > > >> > >> > > > >> int numBuffers = 0; > > >> > >> > > > >> > > >> > >> > > > >> if (records != null) { > > >> > >> > > > >> > > >> > >> > > > >> for (ConsumerRecord<Void, byte[]> record : > > >> > records) { > > >> > >> > > > >> > > >> > >> > > > >> byte[] payload = record.value(); > > >> > >> > > > >> > > >> > >> > > > >> if (payload != null && payload.length > > > 0) { > > >> > >> > > > >> > > >> > >> > > > >> ByteBuffer wrappedBuffer = > > >> > >> > > ByteBuffer.wrap(payload); > > >> > >> > > > >> > > >> > >> > > > >> try { > > >> > >> > > > >> > > >> > >> > > > >> > handler.handleData(wrappedBuffer); > > >> // > > >> > >> This > > >> > >> > is > > >> > >> > > > >> never called > > >> > >> > > > >> > > >> > >> > > > >> } catch (Exception e) { > > >> > >> > > > >> > > >> > >> > > > >> logger.error("Exception > consuming > > >> > buffer > > >> > >> , > > >> > >> > e); > > >> > >> > > > >> > > >> > >> > > > >> } > > >> > >> > > > >> > > >> > >> > > > >> numBuffers += 1; // This is never > > >> > >> incremented. > > >> > >> > > > >> > > >> > >> > > > >> } > > >> > >> > > > >> > > >> > >> > > > >> } > > >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > >> // Commit only after consuming. > > >> > >> > > > >> > > >> > >> > > > >> consumer.commitAsync(offsetCommitCallback); > > >> > >> > > > >> > > >> > >> > > > >> } > > >> > >> > > > >> > > >> > >> > > > >> I also don't see any data in the consumers folder in ZK. > > In > > >> > fact > > >> > >> it > > >> > >> > is > > >> > >> > > > >> completely empty. > > >> > >> > > > >> > > >> > >> > > > >> When I use the console-consumer, I do see the > > >> console-consumer > > >> > >> show > > >> > >> > up > > >> > >> > > > in > > >> > >> > > > >> the consumers folder, but none of my actual consumers > show > > >> up. > > >> > >> > > > >> > > >> > >> > > > >> I tried looking for jmx data on the servers too and > > couldn't > > >> > >> quite > > >> > >> > > > figure > > >> > >> > > > >> out where I can get jmax. > > >> > >> > > > >> > > >> > >> > > > >> I am trying to figure out why the Kafka consumer thinks > it > > >> is > > >> > >> > getting > > >> > >> > > > >> messages ( 5 responses/second according to jmx) but I > > don't > > >> get > > >> > >> any > > >> > >> > in > > >> > >> > > > my > > >> > >> > > > >> application. > > >> > >> > > > >> > > >> > >> > > > >> Thanks, > > >> > >> > > > >> Rajiv > > >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > >> > > >> > >> > > > > > > >> > >> > > > > > >> > >> > > > > >> > >> > > > > >> > >> > > > > >> > >> > > -- > > >> > >> > > -- Guozhang > > >> > >> > > > > >> > >> > > > >> > >> > > >> > > > > >> > > > > >> > > > >> > > > > > > > > >