Hi again, I was able to reproduce the bug in the same scenario (two workers
on separate machines) just by deleting the connector from the Rest API and
then restarting it again.
I also got this error on one of the workers :
[2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45 +0000]
"DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
(org.apache.kafka.connect.runtime.rest.RestServer:60)
[2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
WorkerSinkTask-kafka-sink-connector-1
(org.apache.kafka.connect.util.ShutdownableThread:141)
[2016-05-11 11:29:52,050] ERROR Graceful stop of task
org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
(org.apache.kafka.connect.runtime.Worker:312)
[2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work thread,
exiting:
 (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
java.util.ConcurrentModificationException: KafkaConsumer is not safe for
multi-threaded access
at
org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
at
org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
at
org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
at
org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
at
org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
at
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
at java.lang.Thread.run(Thread.java:745)

On the successive restart, 2 out of 6 tasks were not receiving messages
anymore

2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:

> Hi Liquan,
> thanks for the fast response.
> I'm able to reproduce the error by having two workers running on two
> different machines. If I restart one of the two worker, the failover logic
> correctly detects the failure and shut down the tasks on the healthy worker
> for rebalancing. When the failed worker is up again the tasks are
> distributed correctly among the two workers but some tasks don't get new
> messages anymore. How can I check that actually all the input topic
> partitions are correctly reassigned?
>
> Matteo
>
> 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
>
>> Hi Matteo,
>>
>> Glad to hear that you are building a connector. To better understand the
>> issue, can you provide the exact steps to re-produce the issue? One thing
>> I
>> am confused is that when one worker is shutdown, you don't need to restart
>> the connector through the rest API, the failover logic should handle the
>> connector and tasks shutdown and start up.
>>
>> The offset storage topic is used for storing offset for source connectors.
>> For sink connector, the offset is simply Kafka offset and will be stored
>> in
>> the __consumer_offset topic.
>>
>> Thanks,
>> Liquan
>>
>> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <matteo.lu...@gmail.com>
>> wrote:
>>
>> > Hi,
>> > I'm working on a custom implementation of a sink connector for Kafka
>> > Connect framework. I'm testing the connector for fault tolerance by
>> killing
>> > the worker process  and restarting the connector through the Rest API
>> and
>> > occasionally I notice that some tasks don't receive anymore messages
>> from
>> > the internal consumers. I don't get any errors from the log and the
>> tasks
>> > seem to be initialised correctly but some of them just don't process
>> > messages anymore. Normally when I restart again the connector, the tasks
>> > read all the messages skipped before. I'm executing Kafka Connect in
>> > distributed mode.
>> >
>> > Could it be a problem of the cleanup function invoked when closing the
>> > connector causing a leak in consumer connections with the broker? Any
>> > ideas?
>> >
>> > And also, from the documentation I read that the connector save the
>> offset
>> > of the tasks in a special topic in Kafka (the one specified via
>> > offset.storage.topic) but it is empty even though the connector process
>> > messages. Is it normal?
>> >
>> > Thanks,
>> > Matteo
>> >
>>
>>
>>
>> --
>> Liquan Pei
>> Software Engineer, Confluent Inc
>>
>
>
>
> --
> Matteo Remo Luzzi
>



-- 
Matteo Remo Luzzi

Reply via email to