I am running the 0.9.0.1 broker with the 0.9 consumer. I am using the
subscribe feature on the consumer to subscribe to a topic with 8 partitions.

consumer.subscribe(Arrays.asList(myTopic));

I have a single consumer group for said topic and a single process
subscribed with 8 partitions.

When I use jmx on the consumer I do see that it has 8 partitions assigned
to it according to the consumer-coordinator-metrics mbean. How can I tell
what topic it is listening to? I couldn't find this on jmx anywhere. I do
see that it is getting 5 responses per second according to the
consumer-metrics mbean but I don't see any in my actual application.

I consume my messages like this:

    public int poll(SubscriptionDataHandler handler, long timeout) {

        ConsumerRecords<Void, byte[]> records = null;

        try {

            records = consumer.poll(timeout);

        } catch (Exception e) {

            logger.error("Exception polling the Kafka , e);  // Don't see
any exceptions here

            return -1;

        }

        int numBuffers = 0;

        if (records != null) {

            for (ConsumerRecord<Void, byte[]> record : records) {

                byte[] payload = record.value();

                if (payload != null && payload.length > 0) {

                    ByteBuffer wrappedBuffer = ByteBuffer.wrap(payload);

                    try {

                        handler.handleData(wrappedBuffer); // This is never
called

                    } catch (Exception e) {

                        logger.error("Exception consuming buffer , e);

                    }

                    numBuffers += 1;  // This is never incremented.

                }

            }


            // Commit only after consuming.

            consumer.commitAsync(offsetCommitCallback);

        }

I also don't see any data in the consumers folder in ZK. In fact it is
completely empty.

When I use the console-consumer, I do see the console-consumer show up in
the consumers folder, but none of my actual consumers show up.

I tried looking for jmx data on the servers too and couldn't quite figure
out where I can get jmax.

I am trying to figure out why the Kafka consumer thinks it is getting
messages ( 5 responses/second according to jmx) but I don't get any in my
application.

Thanks,
Rajiv

Reply via email to