Any other thoughts on this? Thanks, Matteo 2016-05-12 13:09 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
> I found also this suspicious log snippet that might be revelant. The task > executed by thread 134 is the one that won't receive messages > > INFO Attempt to heart beat failed since the group is rebalancing, try to > re-join group. > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:633) > [2016-05-12 10:27:09,623] INFO > [kafka-connect-topic-0, kafka-connect-topic-1, kafka-connect-topic-2, > kafka-connect-topic-3, kafka-connect-topic-4, kafka-connect-topic-5] > topic-partitions are revoked > from this thread task 134 > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:99) > [2016-05-12 10:27:09,623] INFO > org.apache.kafka.connect.runtime.WorkerSinkTask@5c29784c Committing > offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187) > [2016-05-12 10:27:09,634] INFO [kafka-connect-topic-0] topic-partitions > are assigned from this thread task 134 > (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:92) > > 2016-05-12 8:58 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > >> Hi, Liquan >> >> I run the two workers inside docker containers and a connector having 6 >> tasks. They read from a topic having 6 partitions Then I kill one of the >> two containers using docker kill or docker restart command. When the >> container is up again a rebalance happens and sometimes few tasks don't >> consume messages anymore even thought the onPartitionAssigned functions >> says that they are handling a partition of the topic. Let me know if you >> need other information >> I use Kafka 0.9.0. >> >> Thanks for the help, >> Matteo >> >> 2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>: >> >>> Hi Matteo, >>> >>> I am not completely follow the steps. Can you share the exact command to >>> reproduce the issue? What kind of commands did you use to restart the >>> connector? Which version of Kafka are you using? >>> >>> Thanks, >>> Liquan >>> >>> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com> >>> wrote: >>> >>> > Hi again, I was able to reproduce the bug in the same scenario (two >>> workers >>> > on separate machines) just by deleting the connector from the Rest API >>> and >>> > then restarting it again. >>> > I also got this error on one of the workers : >>> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45 >>> +0000] >>> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 - 1171 >>> > (org.apache.kafka.connect.runtime.rest.RestServer:60) >>> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread >>> > WorkerSinkTask-kafka-sink-connector-1 >>> > (org.apache.kafka.connect.util.ShutdownableThread:141) >>> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task >>> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed. >>> > (org.apache.kafka.connect.runtime.Worker:312) >>> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work >>> thread, >>> > exiting: >>> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166) >>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe >>> for >>> > multi-threaded access >>> > at >>> > >>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282) >>> > at >>> > >>> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128) >>> > at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238) >>> > at >>> > >>> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182) >>> > at >>> > >>> > >>> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) >>> > at java.lang.Thread.run(Thread.java:745) >>> > >>> > On the successive restart, 2 out of 6 tasks were not receiving messages >>> > anymore >>> > >>> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: >>> > >>> > > Hi Liquan, >>> > > thanks for the fast response. >>> > > I'm able to reproduce the error by having two workers running on two >>> > > different machines. If I restart one of the two worker, the failover >>> > logic >>> > > correctly detects the failure and shut down the tasks on the healthy >>> > worker >>> > > for rebalancing. When the failed worker is up again the tasks are >>> > > distributed correctly among the two workers but some tasks don't get >>> new >>> > > messages anymore. How can I check that actually all the input topic >>> > > partitions are correctly reassigned? >>> > > >>> > > Matteo >>> > > >>> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>: >>> > > >>> > >> Hi Matteo, >>> > >> >>> > >> Glad to hear that you are building a connector. To better >>> understand the >>> > >> issue, can you provide the exact steps to re-produce the issue? One >>> > thing >>> > >> I >>> > >> am confused is that when one worker is shutdown, you don't need to >>> > restart >>> > >> the connector through the rest API, the failover logic should >>> handle the >>> > >> connector and tasks shutdown and start up. >>> > >> >>> > >> The offset storage topic is used for storing offset for source >>> > connectors. >>> > >> For sink connector, the offset is simply Kafka offset and will be >>> stored >>> > >> in >>> > >> the __consumer_offset topic. >>> > >> >>> > >> Thanks, >>> > >> Liquan >>> > >> >>> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi < >>> matteo.lu...@gmail.com> >>> > >> wrote: >>> > >> >>> > >> > Hi, >>> > >> > I'm working on a custom implementation of a sink connector for >>> Kafka >>> > >> > Connect framework. I'm testing the connector for fault tolerance >>> by >>> > >> killing >>> > >> > the worker process and restarting the connector through the Rest >>> API >>> > >> and >>> > >> > occasionally I notice that some tasks don't receive anymore >>> messages >>> > >> from >>> > >> > the internal consumers. I don't get any errors from the log and >>> the >>> > >> tasks >>> > >> > seem to be initialised correctly but some of them just don't >>> process >>> > >> > messages anymore. Normally when I restart again the connector, the >>> > tasks >>> > >> > read all the messages skipped before. I'm executing Kafka Connect >>> in >>> > >> > distributed mode. >>> > >> > >>> > >> > Could it be a problem of the cleanup function invoked when >>> closing the >>> > >> > connector causing a leak in consumer connections with the broker? >>> Any >>> > >> > ideas? >>> > >> > >>> > >> > And also, from the documentation I read that the connector save >>> the >>> > >> offset >>> > >> > of the tasks in a special topic in Kafka (the one specified via >>> > >> > offset.storage.topic) but it is empty even though the connector >>> > process >>> > >> > messages. Is it normal? >>> > >> > >>> > >> > Thanks, >>> > >> > Matteo >>> > >> > >>> > >> >>> > >> >>> > >> >>> > >> -- >>> > >> Liquan Pei >>> > >> Software Engineer, Confluent Inc >>> > >> >>> > > >>> > > >>> > > >>> > > -- >>> > > Matteo Remo Luzzi >>> > > >>> > >>> > >>> > >>> > -- >>> > Matteo Remo Luzzi >>> > >>> >>> >>> >>> -- >>> Liquan Pei >>> Software Engineer, Confluent Inc >>> >> >> >> >> -- >> Matteo Remo Luzzi >> > > > > -- > Matteo Remo Luzzi > -- Matteo Remo Luzzi