Hi again, I was able to reproduce the bug in the same scenario (two workers on separate machines) just by deleting the connector from the Rest API and then restarting it again. I also got this error on one of the workers : [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45 +0000] "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 - 1171 (org.apache.kafka.connect.runtime.rest.RestServer:60) [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread WorkerSinkTask-kafka-sink-connector-1 (org.apache.kafka.connect.util.ShutdownableThread:141) [2016-05-11 11:29:52,050] ERROR Graceful stop of task org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed. (org.apache.kafka.connect.runtime.Worker:312) [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work thread, exiting: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166) java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282) at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213) at org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128) at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898) at org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209) at org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159) at java.lang.Thread.run(Thread.java:745)
On the successive restart, 2 out of 6 tasks were not receiving messages anymore 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>: > Hi Liquan, > thanks for the fast response. > I'm able to reproduce the error by having two workers running on two > different machines. If I restart one of the two worker, the failover logic > correctly detects the failure and shut down the tasks on the healthy worker > for rebalancing. When the failed worker is up again the tasks are > distributed correctly among the two workers but some tasks don't get new > messages anymore. How can I check that actually all the input topic > partitions are correctly reassigned? > > Matteo > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>: > >> Hi Matteo, >> >> Glad to hear that you are building a connector. To better understand the >> issue, can you provide the exact steps to re-produce the issue? One thing >> I >> am confused is that when one worker is shutdown, you don't need to restart >> the connector through the rest API, the failover logic should handle the >> connector and tasks shutdown and start up. >> >> The offset storage topic is used for storing offset for source connectors. >> For sink connector, the offset is simply Kafka offset and will be stored >> in >> the __consumer_offset topic. >> >> Thanks, >> Liquan >> >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <matteo.lu...@gmail.com> >> wrote: >> >> > Hi, >> > I'm working on a custom implementation of a sink connector for Kafka >> > Connect framework. I'm testing the connector for fault tolerance by >> killing >> > the worker process and restarting the connector through the Rest API >> and >> > occasionally I notice that some tasks don't receive anymore messages >> from >> > the internal consumers. I don't get any errors from the log and the >> tasks >> > seem to be initialised correctly but some of them just don't process >> > messages anymore. Normally when I restart again the connector, the tasks >> > read all the messages skipped before. I'm executing Kafka Connect in >> > distributed mode. >> > >> > Could it be a problem of the cleanup function invoked when closing the >> > connector causing a leak in consumer connections with the broker? Any >> > ideas? >> > >> > And also, from the documentation I read that the connector save the >> offset >> > of the tasks in a special topic in Kafka (the one specified via >> > offset.storage.topic) but it is empty even though the connector process >> > messages. Is it normal? >> > >> > Thanks, >> > Matteo >> > >> >> >> >> -- >> Liquan Pei >> Software Engineer, Confluent Inc >> > > > > -- > Matteo Remo Luzzi > -- Matteo Remo Luzzi