Hi Jason, i reverted back to KAFKA-3149. Producer still had issues related to schema but my consumer worked.
Now consumer worked as expected. Although i did not encountered an error and generation was not marked dead by coordinator but i still see that successful heartbeat response are not logged as expected. My observation is following:- 1) Meta refresh also triggers heartbeat request. I say this because sometimes i see 2 heartbeat responses logged just a few milliseconds away where meta refresh and proactive commit happened almost simultaneously. 2) I still see that some commitSync requests do not have a heartbeat logged before or after commit. Although next proactive commit happened just in time and this time heartbeat request was successful hence saved session. In attached log you can see that poll was done at 14:17:41, a commit happened at 14:17:56 and another commit happened at 14:18:14. The only heart beat response logged during this time is at 14:18:14 which is 29 seconds after poll where as a commit was performed 15 seconds after poll. Heartbeat interval was 3000. 3) There are long pauses in heartbeat responses in logs which should cause session to timeout but its not happening. This implies that commits trigger a heartbeat but they also act as heartbeat. Regards, Vinay On Thu, Apr 28, 2016 at 12:29 PM, Jason Gustafson <ja...@confluent.io> wrote: > Ah, yeah. That's probably caused by the new topic metadata version, which > isn't supported on 0.9 brokers. To test on trunk, you'd have to upgrade the > brokers as well. Either that or you can rewind to before KAFKA-3306 (which > was just committed the day before yesterday)? > > -Jason > > On Thu, Apr 28, 2016 at 9:01 AM, vinay sharma <vinsharma.t...@gmail.com> > wrote: > > > Hi Jason, > > > > I build kafka-client and tried using it but my producers and consumers > > started throwing below exception. Is 0.10 not going to be compatible with > > brokers on version 0.9.0.1? or do i need to make some config changes to > > producers / consumers to make them compatible with brokers on old > version? > > or do i need to upgrade brokers to new version as well? > > > > org.apache.kafka.common.protocol.types.SchemaException: Error reading > > field 'brokers': Error reading field 'host': Error reading string of > length > > 17995, only 145 bytes available > > at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:75) > > at > > > > > org.apache.kafka.clients.NetworkClient.parseResponse(NetworkClient.java:380) > > at > > > > > org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:449) > > > > Regards, > > Vinay Sharma > > > > On Thu, Apr 28, 2016 at 12:32 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hey Vinay, > > > > > > Any chance you can run the same test against trunk? I'm guessing this > > might > > > be caused by a bug in the 0.9 consumer which basically causes some > > requests > > > to fail when a bunch of them are sent to the broker at the same time. > > > > > > -Jason > > > > > > On Wed, Apr 27, 2016 at 1:02 PM, vinay sharma < > vinsharma.t...@gmail.com> > > > wrote: > > > > > > > Hi Jason, > > > > > > > > This makes sense.We use 0.9.0.1 and we do have session timeout set a > > bit > > > > high but nothing can guarantee that there will be no case when > > processing > > > > may not go higher than session timeout. I am trying to test a > proactive > > > > commit approach to handle such cases when processing takes unusually > > long > > > > time. To keep consumer's session alive during long processing time i > > > > proactively commitSync processed records every 15 seconds. Session > > > timeout > > > > i kept is 30000. > > > > > > > > *Problem:-* > > > > With heart beat interval is 3000 then i expect a hearbeat request to > be > > > > sent on each proactive commit which happens every 15 seconds. In my > > > tests i > > > > see that this does not happen always. I see a time window which is > > > greater > > > > than 30 seconds where no hearbeat is sent even thought there were > > commits > > > > in this duration. After this window i see a couple of successful > > > heartbeat > > > > responses till the end of poll but as soon as i poll again and call > > > > commitSync in next poll i get "ILLEGAL_GENERATION" error. This error > > > always > > > > happen just after meta refresh or in next poll processing after a > meta > > > > refresh. I am attaching logs where i kept meta refresh interval > 40000, > > > > 90000, 500000. > > > > > > > > *Test results *:- > > > > Test with meta refresh 40000 ms ran around 70 seconds from 1st poll. > > > > Test with meta refresh 90000 ms ran around 120 seconds from 1st poll. > > > > Test with meta refresh 500000 ms ran around 564 seconds from 1st > poll. > > > > > > > > Every test falls in line with above test cases where generation is > > marked > > > > dead some time after a meta refresh. Meta refresh before 1st poll > does > > > not > > > > create any issue but the ones after poll and during long processing > do. > > > > > > > > *Environment:-* > > > > My setup has 3 brokers 1 zk. Topic has 3 partitions ans has > replication > > > > factor 3. Messages are already published to topic. > > > > > > > > *Logic used in test cases :- * > > > > On each poll I initialize a map with current committed offset > position > > of > > > > partitions being consumed. I update this map after each record > > processing > > > > and use this map to proactively commit every 15 seconds. Map is > > > initialized > > > > again after a proactive commit. > > > > > > > > I am not sure what is wrong here but i do not see any issue in code > or > > > > offset commits going on. Log files and a class with main method are > > > > attached for your reference. > > > > > > > > Regards, > > > > Vinay Sharma > > > > > > > > > > > > > > > > On Wed, Apr 27, 2016 at 2:46 PM, Jason Gustafson <ja...@confluent.io > > > > > > wrote: > > > > > > > >> Hi Vinay, > > > >> > > > >> Answers below: > > > >> > > > >> 1) Is it correct to say that each commitSync will trigger a > > > >> HeartBeatTask? > > > >> > If there is no hear beat sent in past since specified heartbeat > > > interval > > > >> > then i should see a successful heartbeat response or failure > message > > > in > > > >> > logs near to commitSync success log? > > > >> > > > >> > > > >> Not quite. Heartbeats are sent periodically according to the > > > >> heartbeat.interval.ms configuration. However, since the consumer > has > > no > > > >> background thread, they can only be sent in API calls such as poll() > > or > > > >> commitSync(). So calling commitSync() may or may not result in a > > > heartbeat > > > >> depending only on whether one is "due." > > > >> > > > >> 2) is it correct to say that Meta Data refresh will not act as > > > heartbeat, > > > >> > will not trigger heartBeatTask and will not reset heartBeatTask? > > > >> > > > >> > > > >> That is correct. Metadata refreshes are not related to heartbeats. > > > >> > > > >> 3) Where does a consumer session maintained? Lets say my consumer is > > > >> > listening to 3 partitions on a 3 broker cluster where each broker > is > > > >> leader > > > >> > of 1 partition. So will each of the brokers will have a session > for > > my > > > >> > consumer or is it just 1 session maintained somewhere in common > like > > > >> > zookeeper? > > > >> > > > >> > > > >> One of the brokers serves as the "group coordinator." When the > > consumer > > > >> starts up, it sends a GroupCoordinator request to one of the brokers > > to > > > >> find out who the coordinator is. Currently, coordinators are chosen > > from > > > >> among the leaders of the partitions of the __consumer_offsets topic. > > > This > > > >> lets us take advantage of the leader election process to also handle > > > >> coordinator failures. The coordinator of each group maintains state > > for > > > >> the > > > >> group and keeps track of session timeouts. > > > >> > > > >> 4) In above setup, during a long processing if I commit a record > > through > > > >> > commmitSync which triggers a hear beat request and a successful > > > >> response is > > > >> > received for the same then what does this response means? does it > > mean > > > >> that > > > >> > my session with each broker is renewed? or does it mean that just > > the > > > >> > leader for partition of committed record knows that my consumer is > > > alive > > > >> > and consumer's session on other brokers will still timeout? > > > >> > > > >> > > > >> The coordinator is the only broker that is aware of a consumer's > > session > > > >> and all offset commits are sent to it. Successful heartbeats mean > that > > > the > > > >> session is still active. Heartbeats are also used to let the > consumer > > > >> discover when a rebalance has begun. If a new member joins the > group, > > > then > > > >> the coordinator returns an error code in the heartbeat responses of > > the > > > >> active members to let them know that they need to rejoin the group > so > > > that > > > >> partitions can be rebalanced. > > > >> > > > >> I wouldn't get too hung up on commit/heartbeat behavior. The crux of > > the > > > >> issue is that you need to call poll() often enough to avoid getting > > > timed > > > >> out by the coordinator. If you find this happening frequently, you > > > >> probably > > > >> need to increase session.timeout.ms. There's not really any > downside > > to > > > >> doing so other than that hard failures (in which the consumer can't > be > > > >> shutdown cleanly) will take a little longer to detect. Normal > shutdown > > > >> doesn't have this problem. It can be difficult in 0.9 to ensure that > > > >> poll() > > > >> is called often enough since you don't have direct control over the > > > amount > > > >> of data returned in poll(), but we're adding an option > > > (max.poll.records) > > > >> in 0.10 which hopefully can be set conservatively enough to make > this > > > >> problem go away. > > > >> > > > >> -Jason > > > >> > > > >> On Wed, Apr 27, 2016 at 7:11 AM, vinay sharma < > > vinsharma.t...@gmail.com > > > > > > > >> wrote: > > > >> > > > >> > Hey, > > > >> > > > > >> > I am working on a simplified test case to check if there is any > > issue > > > >> in my > > > >> > code. Just to make sure that any of my assumptions are not wrong, > it > > > >> will > > > >> > be great if you can please help me in finding answers to following > > > >> > queries:- > > > >> > > > > >> > 1) Is it correct to say that each commitSync will trigger a > > > >> HeartBeatTask? > > > >> > If there is no hear beat sent in past since specified heartbeat > > > interval > > > >> > then i should see a successful heartbeat response or failure > message > > > in > > > >> > logs near to commitSync success log? > > > >> > 2) is it correct to say that Meta Data refresh will not act as > > > >> heartbeat, > > > >> > will not trigger heartBeatTask and will not reset heartBeatTask? > > > >> > 3) Where does a consumer session maintained? Lets say my consumer > is > > > >> > listening to 3 partitions on a 3 broker cluster where each broker > is > > > >> leader > > > >> > of 1 partition. So will each of the brokers will have a session > for > > my > > > >> > consumer or is it just 1 session maintained somewhere in common > like > > > >> > zookeeper? > > > >> > 4) In above setup, during a long processing if I commit a record > > > through > > > >> > commmitSync which triggers a hear beat request and a successful > > > >> response is > > > >> > received for the same then what does this response means? does it > > mean > > > >> that > > > >> > my session with each broker is renewed? or does it mean that just > > the > > > >> > leader for partition of committed record knows that my consumer is > > > alive > > > >> > and consumer's session on other brokers will still timeout? > > > >> > > > > >> > Regards, > > > >> > Vinay Sharma > > > >> > > > > >> > On Tue, Apr 26, 2016 at 2:38 PM, Jason Gustafson < > > ja...@confluent.io> > > > >> > wrote: > > > >> > > > > >> > > Hey Vinay, > > > >> > > > > > >> > > Are you saying that heartbeats are not sent while a metadata > > refresh > > > >> is > > > >> > in > > > >> > > progress? Do you have any logs which show us the apparent > problem? > > > >> > > > > > >> > > Thanks, > > > >> > > Jason > > > >> > > > > > >> > > On Tue, Apr 26, 2016 at 8:18 AM, vinay sharma < > > > >> vinsharma.t...@gmail.com> > > > >> > > wrote: > > > >> > > > > > >> > > > Hi Ismael, > > > >> > > > > > > >> > > > Treating commitSync as heartbeat will definitely resolve the > > issue > > > >> i am > > > >> > > > facing but the reason behind my issue does not seem to be what > > > >> > mentioned > > > >> > > in > > > >> > > > defect (i.e frequent commitSync requests). > > > >> > > > > > > >> > > > I am sending CommitSync periodically only to keep my session > > alive > > > >> when > > > >> > > my > > > >> > > > consumer is still processing records and is close to session > > time > > > >> out > > > >> > > > (tried 10th / 12th / 15th / 20th second after poll called > where > > > >> session > > > >> > > > time is 30). I see heartbeat response received in logs along > > with > > > >> each > > > >> > > > commitSync call but this stops after a meta data refresh > request > > > is > > > >> > > issued. > > > >> > > > I see in logs that commit goes successful but no heartbeat > > > response > > > >> > > > received message in logs after meta refresh till next poll. > > > >> > > > > > > >> > > > Regards, > > > >> > > > Vinay Sharma > > > >> > > > > > > >> > > > On Mon, Apr 25, 2016 at 5:06 PM, Ismael Juma < > ism...@juma.me.uk > > > > > > >> > wrote: > > > >> > > > > > > >> > > > > Hi Vinay, > > > >> > > > > > > > >> > > > > This was fixed via > > > >> https://issues.apache.org/jira/browse/KAFKA-3470 > > > >> > > > (will > > > >> > > > > be part of 0.10.0.0). > > > >> > > > > > > > >> > > > > Ismael > > > >> > > > > > > > >> > > > > > > > >> > > > > > > > >> > > > > On Mon, Apr 25, 2016 at 1:52 PM, vinay sharma < > > > >> > > vinsharma.t...@gmail.com> > > > >> > > > > wrote: > > > >> > > > > > > > >> > > > > > Hello, > > > >> > > > > > > > > >> > > > > > I am using client API 0.9.0.1 and facing an issue. As per > my > > > >> logs > > > >> > it > > > >> > > > > seems > > > >> > > > > > that on each commitSync(Offsets) a heartbeat request is > sent > > > but > > > >> > > after > > > >> > > > a > > > >> > > > > > metada refresh request till next poll(), commits do not > send > > > any > > > >> > > > hearbeat > > > >> > > > > > request. > > > >> > > > > > > > > >> > > > > > KafkaConsumers i create sometimes get session time out due > > to > > > no > > > >> > > > hearbeat > > > >> > > > > > specially during longer processing times. I call > > > >> > CommitSync(offsets) > > > >> > > > > after > > > >> > > > > > regular intervals to keep session alive when processing > > takes > > > >> > longer > > > >> > > > than > > > >> > > > > > usual. Every thing works fine if commit intervals are very > > > >> small or > > > >> > > if > > > >> > > > i > > > >> > > > > > commit after each record but if i commit lets say every 12 > > > >> seconds > > > >> > > and > > > >> > > > 30 > > > >> > > > > > seconds is session time then i can see consumer getting > > timed > > > >> out > > > >> > > > > > sometimes. > > > >> > > > > > > > > >> > > > > > Any help or pointers will be much appreciated. Thanks in > > > >> advance. > > > >> > > > > > > > > >> > > > > > Regards, > > > >> > > > > > Vinay sharma > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >
14:17:41.392 [main] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 40000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1000 bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 10000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = kafkaPOCGroup1 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 14:17:41.409 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Starting the Kafka consumer 14:17:41.531 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [localhost:9092 (id: -1), localhost:9093 (id: -2), localhost:9094 (id: -3)], partitions = []) 14:17:41.614 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-closed: 14:17:41.617 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name connections-created: 14:17:41.618 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received: 14:17:41.618 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-sent: 14:17:41.623 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-received: 14:17:41.624 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name select-time: 14:17:41.624 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name io-time: 14:17:41.638 [main] INFO o.a.k.c.consumer.ConsumerConfig - ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 40000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1000 bootstrap.servers = [localhost:9092, localhost:9093, localhost:9094] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 2147483647 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 10000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer group.id = kafkaPOCGroup1 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = latest 14:17:41.665 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency 14:17:41.665 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name join-latency 14:17:41.666 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name sync-latency 14:17:41.672 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name commit-latency 14:17:41.687 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched 14:17:41.688 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-fetched 14:17:41.689 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-latency 14:17:41.690 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name records-lag 14:17:41.690 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time 14:17:41.696 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka version : 0.10.1.0-SNAPSHOT 14:17:41.697 [main] INFO o.a.kafka.common.utils.AppInfoParser - Kafka commitId : 5b375d7bf9b26aae 14:17:41.698 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Kafka consumer created 14:17:41.699 [main] DEBUG o.a.k.clients.consumer.KafkaConsumer - Subscribed to topic(s): kafkaPOCTopic 14:17:41.700 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Sending coordinator request for group kafkaPOCGroup1 to broker localhost:9094 (id: -3) 14:17:41.739 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -3 at localhost:9094. 14:17:41.747 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 14:17:41.747 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092. 14:17:41.749 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-sent 14:17:41.749 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--3.bytes-received 14:17:41.749 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--3.latency 14:17:41.750 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 14:17:41.750 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 14:17:41.750 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 14:17:41.751 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -3 14:17:41.751 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node -1 14:17:41.752 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=consumer-1}, body={topics=[kafkaPOCTopic]}), isInitiatedByNetworkClient, createdTimeMs=1461867461751, sendTimeMs=0) to node -1 14:17:41.782 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [FKMNV32.ceb.com:9094 (id: 2), FKMNV32.ceb.com:9092 (id: 0), FKMNV32.ceb.com:9093 (id: 1)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = kafkaPOCTopic, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 14:17:41.785 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received group coordinator response ClientResponse(receivedTimeMs=1461867461784, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@33f88ab, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=kafkaPOCGroup1}), createdTimeMs=1461867461732, sendTimeMs=1461867461754), responseBody={error_code=0,coordinator={node_id=0,host=FKMNV32.ceb.com,port=9092}}) 14:17:41.785 [main] INFO o.a.k.c.c.i.AbstractCoordinator - Discovered coordinator FKMNV32.ceb.com:9092 (id: 2147483647) for group kafkaPOCGroup1. 14:17:41.785 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 2147483647 at FKMNV32.ceb.com:9092. 14:17:41.789 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - Revoking previously assigned partitions [] for group kafkaPOCGroup1 14:17:41.789 [main] INFO o.a.k.c.c.i.AbstractCoordinator - (Re-)joining group kafkaPOCGroup1 14:17:41.795 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Sending JoinGroup ({group_id=kafkaPOCGroup1,session_timeout=30000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]}) to coordinator FKMNV32.ceb.com:9092 (id: 2147483647) 14:17:41.798 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-sent 14:17:41.799 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.bytes-received 14:17:41.800 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2147483647.latency 14:17:41.800 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node 2147483647 14:17:41.805 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful join group response for group kafkaPOCGroup1: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5,member_id=consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5,members=[{member_id=consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=25 cap=25]}]} 14:17:41.807 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Performing assignment for group kafkaPOCGroup1 using strategy range with subscriptions {consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@1877ab81} 14:17:41.809 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Finished assignment for group kafkaPOCGroup1: {consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@11438d26} 14:17:41.811 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Sending leader SyncGroup for group kafkaPOCGroup1 to coordinator FKMNV32.ceb.com:9092 (id: 2147483647): {group_id=kafkaPOCGroup1,generation_id=1,member_id=consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5,group_assignment=[{member_id=consumer-1-5cfdab38-1eec-4c73-bfef-7013d3b620c5,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=41 cap=41]}]} 14:17:41.825 [main] INFO o.a.k.c.c.i.AbstractCoordinator - Successfully joined group kafkaPOCGroup1 with generation 1 14:17:41.827 [main] INFO o.a.k.c.c.i.ConsumerCoordinator - Setting newly assigned partitions [kafkaPOCTopic-1, kafkaPOCTopic-2, kafkaPOCTopic-0] for group kafkaPOCGroup1 14:17:41.828 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 fetching committed offsets for partitions: [kafkaPOCTopic-1, kafkaPOCTopic-2, kafkaPOCTopic-0] 14:17:41.835 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition kafkaPOCTopic-1 to the committed offset 52631 14:17:41.835 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition kafkaPOCTopic-2 to the committed offset 52429 14:17:41.835 [main] DEBUG o.a.k.c.consumer.internals.Fetcher - Resetting offset for partition kafkaPOCTopic-0 to the committed offset 186043 14:17:41.839 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 1 at FKMNV32.ceb.com:9093. 14:17:41.840 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 2 at FKMNV32.ceb.com:9094. 14:17:41.841 [main] DEBUG o.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at FKMNV32.ceb.com:9092. 14:17:41.842 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 14:17:41.843 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 14:17:41.843 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 14:17:41.843 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-1.bytes-sent 14:17:41.844 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-1.bytes-received 14:17:41.844 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-1.latency 14:17:41.844 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2.bytes-sent 14:17:41.845 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2.bytes-received 14:17:41.845 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name node-2.latency 14:17:41.845 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node 0 14:17:41.845 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node 1 14:17:41.845 [main] DEBUG o.apache.kafka.clients.NetworkClient - Completed connection to node 2 14:17:41.867 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name topic.kafkaPOCTopic.bytes-fetched 14:17:41.867 [main] DEBUG o.a.kafka.common.metrics.Metrics - Added sensor with name topic.kafkaPOCTopic.records-fetched 14:17:41.871 [main] INFO com.test.SimpleKafkaConsumer - Polling 1 ####################### 14:17:41.871 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186043, 1=52631, 2=52429}} 14:17:41.871 [main] INFO com.test.SimpleKafkaConsumer - Total records returned in poll = 17 14:17:44.874 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186043, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:17:47.874 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186044, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:17:50.874 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186045, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:17:53.874 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186046, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:17:56.874 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186047, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:17:56.874 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:17:56.884 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52631 for partition kafkaPOCTopic-1 14:17:56.884 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:17:56.884 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186047 for partition kafkaPOCTopic-0 14:17:56.884 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52631, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52429, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186047, metadata=''}} 14:17:56.884 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:17:56.884 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186047, 1=52631, 2=52429}} 14:17:59.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186048, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:02.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186049, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:05.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186050, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:08.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186051, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:11.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186052, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:14.885 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186053, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:14.885 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:18:14.887 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:18:14.892 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52631 for partition kafkaPOCTopic-1 14:18:14.893 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:18:14.893 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186053 for partition kafkaPOCTopic-0 14:18:14.893 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52631, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52429, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186053, metadata=''}} 14:18:14.893 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:18:14.893 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186053, 1=52631, 2=52429}} 14:18:17.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186054, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:18:20.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186055, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:23.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186056, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:26.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186057, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:29.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186058, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:32.893 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186059, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:32.893 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=14,client_id=consumer-1}, body={topics=[kafkaPOCTopic]}), isInitiatedByNetworkClient, createdTimeMs=1461867512893, sendTimeMs=0) to node 1 14:18:32.893 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:18:32.895 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 3 to Cluster(nodes = [FKMNV32.ceb.com:9094 (id: 2), FKMNV32.ceb.com:9092 (id: 0), FKMNV32.ceb.com:9093 (id: 1)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = kafkaPOCTopic, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 14:18:32.898 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52631 for partition kafkaPOCTopic-1 14:18:32.899 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:18:32.899 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:18:32.899 [main] INFO com.test.SimpleKafkaConsumer - consumer.commitSync() called 14:18:32.899 [main] INFO com.test.SimpleKafkaConsumer - Poll processing finished for poll 1 ####################### 14:18:32.900 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:18:32.900 [main] INFO com.test.SimpleKafkaConsumer - Polling 2 ####################### 14:18:32.901 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52631, 2=52429}} 14:18:32.901 [main] INFO com.test.SimpleKafkaConsumer - Total records returned in poll = 51 14:18:35.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52631, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:38.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52632, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:41.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52633, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:44.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52634, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:47.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52635, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:50.901 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52636, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:18:50.901 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:18:50.907 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52636 for partition kafkaPOCTopic-1 14:18:50.907 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:18:50.908 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:18:50.908 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52636, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52429, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186060, metadata=''}} 14:18:50.908 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:18:50.908 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52636, 2=52429}} 14:18:53.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52637, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:18:56.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52638, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:18:59.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52639, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:02.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52640, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:05.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52641, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:08.908 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52642, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:08.908 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:19:08.911 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:19:08.913 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52642 for partition kafkaPOCTopic-1 14:19:08.913 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:19:08.913 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:19:08.913 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52642, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52429, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186060, metadata=''}} 14:19:08.913 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:19:08.913 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52642, 2=52429}} 14:19:11.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52643, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:14.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52644, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:17.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52645, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:20.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52646, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:23.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52647, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:19:26.913 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52429, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:26.913 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:19:26.913 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=24,client_id=consumer-1}, body={topics=[kafkaPOCTopic]}), isInitiatedByNetworkClient, createdTimeMs=1461867566913, sendTimeMs=0) to node 0 14:19:26.914 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:19:26.914 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 4 to Cluster(nodes = [FKMNV32.ceb.com:9093 (id: 1), FKMNV32.ceb.com:9094 (id: 2), FKMNV32.ceb.com:9092 (id: 0)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = kafkaPOCTopic, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 14:19:26.918 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:19:26.918 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52429 for partition kafkaPOCTopic-2 14:19:26.918 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:19:26.918 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52429, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186060, metadata=''}} 14:19:26.918 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:19:26.919 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52647, 2=52429}} 14:19:29.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52430, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:32.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52431, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:35.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52432, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:38.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52433, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:41.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52434, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:44.919 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52435, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:44.919 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:19:44.919 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:19:44.923 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:19:44.923 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52435 for partition kafkaPOCTopic-2 14:19:44.923 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:19:44.923 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52435, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186060, metadata=''}} 14:19:44.923 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:19:44.924 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52647, 2=52435}} 14:19:47.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52436, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:50.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52437, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:53.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52438, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:56.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52439, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:19:59.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52440, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:20:02.924 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52441, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:20:02.924 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:20:02.925 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:20:02.930 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:20:02.930 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52441 for partition kafkaPOCTopic-2 14:20:02.931 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186060 for partition kafkaPOCTopic-0 14:20:02.931 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52441, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186060, metadata=''}} 14:20:02.931 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:20:02.931 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186060, 1=52647, 2=52441}} 14:20:05.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52442, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:20:08.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52443, NoTimestampType = -1, checksum = 1473396598, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:05) 14:20:11.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52444, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:14.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 2, offset = 52445, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:17.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186060, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:20.931 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186061, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:20.931 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:20:20.931 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=31,client_id=consumer-1}, body={topics=[kafkaPOCTopic]}), isInitiatedByNetworkClient, createdTimeMs=1461867620931, sendTimeMs=0) to node 2 14:20:20.932 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:20:20.933 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 5 to Cluster(nodes = [FKMNV32.ceb.com:9094 (id: 2), FKMNV32.ceb.com:9093 (id: 1), FKMNV32.ceb.com:9092 (id: 0)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = kafkaPOCTopic, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 14:20:20.937 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:20:20.937 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52445 for partition kafkaPOCTopic-2 14:20:20.937 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186061 for partition kafkaPOCTopic-0 14:20:20.937 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52445, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186061, metadata=''}} 14:20:20.937 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:20:20.938 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186061, 1=52647, 2=52445}} 14:20:23.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186062, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:26.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186063, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:29.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186064, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:32.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186065, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:35.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186066, NoTimestampType = -1, checksum = 1583641437, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:09) 14:20:38.938 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186067, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:38.938 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:20:38.939 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:20:38.942 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:20:38.942 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52445 for partition kafkaPOCTopic-2 14:20:38.943 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186067 for partition kafkaPOCTopic-0 14:20:38.943 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52445, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186067, metadata=''}} 14:20:38.943 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:20:38.943 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186067, 1=52647, 2=52445}} 14:20:41.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186068, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:44.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186069, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:47.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186070, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:50.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186071, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:53.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186072, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:56.943 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186073, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:20:56.943 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:20:56.943 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:20:56.947 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52647 for partition kafkaPOCTopic-1 14:20:56.947 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52445 for partition kafkaPOCTopic-2 14:20:56.947 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186073 for partition kafkaPOCTopic-0 14:20:56.947 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52647, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52445, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186073, metadata=''}} 14:20:56.947 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:20:56.947 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186073, 1=52647, 2=52445}} 14:20:59.947 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186074, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:02.947 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186075, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:05.947 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 0, offset = 186076, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:05.947 [main] DEBUG o.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=38,client_id=consumer-1}, body={topics=[kafkaPOCTopic]}), isInitiatedByNetworkClient, createdTimeMs=1461867665947, sendTimeMs=0) to node 0 14:21:05.947 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:21:05.948 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [FKMNV32.ceb.com:9092 (id: 0), FKMNV32.ceb.com:9094 (id: 2), FKMNV32.ceb.com:9093 (id: 1)], partitions = [Partition(topic = kafkaPOCTopic, partition = 1, leader = 1, replicas = [1,], isr = [1,], Partition(topic = kafkaPOCTopic, partition = 2, leader = 2, replicas = [2,], isr = [2,], Partition(topic = kafkaPOCTopic, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 14:21:05.952 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52648 for partition kafkaPOCTopic-1 14:21:05.952 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52446 for partition kafkaPOCTopic-2 14:21:05.952 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186077 for partition kafkaPOCTopic-0 14:21:05.952 [main] INFO com.test.SimpleKafkaConsumer - consumer.commitSync() called 14:21:05.952 [main] INFO com.test.SimpleKafkaConsumer - Poll processing finished for poll 2 ####################### 14:21:05.953 [main] DEBUG o.a.k.c.c.i.AbstractCoordinator - Received successful heartbeat response for group kafkaPOCGroup1 14:21:05.955 [main] INFO com.test.SimpleKafkaConsumer - Polling 3 ####################### 14:21:05.955 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186077, 1=52648, 2=52446}} 14:21:05.955 [main] INFO com.test.SimpleKafkaConsumer - Total records returned in poll = 51 14:21:08.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52648, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:11.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52649, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:14.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52650, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:17.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52651, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:20.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52652, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:23.955 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52653, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10) 14:21:23.955 [main] INFO com.test.SimpleKafkaConsumer - Starting proactive commit 14:21:23.961 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52653 for partition kafkaPOCTopic-1 14:21:23.961 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 52446 for partition kafkaPOCTopic-2 14:21:23.961 [main] DEBUG o.a.k.c.c.i.ConsumerCoordinator - Group kafkaPOCGroup1 committed offset 186077 for partition kafkaPOCTopic-0 14:21:23.961 [main] INFO com.test.SimpleKafkaConsumer - Commited offsets {kafkaPOCTopic-1=OffsetAndMetadata{offset=52653, metadata=''}, kafkaPOCTopic-2=OffsetAndMetadata{offset=52446, metadata=''}, kafkaPOCTopic-0=OffsetAndMetadata{offset=186077, metadata=''}} 14:21:23.961 [main] INFO com.test.SimpleKafkaConsumer - Proactive commit done 14:21:23.962 [main] INFO com.test.SimpleKafkaConsumer - Map to be used to commit proactively is initialized to {kafkaPOCTopic={0=186077, 1=52653, 2=52446}} 14:21:26.962 [main] INFO com.test.SimpleKafkaConsumer - processed record ConsumerRecord(topic = kafkaPOCTopic, partition = 1, offset = 52654, NoTimestampType = -1, checksum = 1050933944, serialized key size = -1, serialized value size = 30, key = null, value = my message 2016 04 27 02:51:10)