Any other thoughts on this?
Thanks,
Matteo

2016-05-12 13:09 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:

> I found also this suspicious log snippet that might be revelant. The task
> executed by thread 134 is the one that won't receive messages
>
> INFO Attempt to heart beat failed since the group is rebalancing, try to
> re-join group.
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:633)
> [2016-05-12 10:27:09,623] INFO
> [kafka-connect-topic-0, kafka-connect-topic-1, kafka-connect-topic-2, 
> kafka-connect-topic-3, kafka-connect-topic-4, kafka-connect-topic-5]
> topic-partitions are revoked
> from this thread task 134 
> (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:99)
> [2016-05-12 10:27:09,623] INFO
> org.apache.kafka.connect.runtime.WorkerSinkTask@5c29784c Committing
> offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:187)
> [2016-05-12 10:27:09,634] INFO [kafka-connect-topic-0] topic-partitions
> are assigned from this thread task 134
> (com.connect.elasticsearch_kafka_connector.task.ElasticsearchTask:92)
>
> 2016-05-12 8:58 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
>
>> Hi, Liquan
>>
>> I run the two workers inside docker containers and a connector having 6
>> tasks. They read from a topic having 6 partitions Then I kill one of the
>> two containers using docker kill or docker restart command. When the
>> container is up again a rebalance happens and sometimes few tasks don't
>> consume messages anymore even thought the onPartitionAssigned functions
>> says that they are handling a partition of the topic. Let me know if you
>> need other information
>> I use Kafka 0.9.0.
>>
>> Thanks for the help,
>> Matteo
>>
>> 2016-05-11 22:57 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
>>
>>> Hi Matteo,
>>>
>>> I am not completely follow the steps.  Can you share the exact command to
>>> reproduce the issue? What kind of commands did you use to restart the
>>> connector? Which version of Kafka are you using?
>>>
>>> Thanks,
>>> Liquan
>>>
>>> On Wed, May 11, 2016 at 4:40 AM, Matteo Luzzi <matteo.lu...@gmail.com>
>>> wrote:
>>>
>>> > Hi again, I was able to reproduce the bug in the same scenario (two
>>> workers
>>> > on separate machines) just by deleting the connector from the Rest API
>>> and
>>> > then restarting it again.
>>> > I also got this error on one of the workers :
>>> > [2016-05-11 11:29:47,034] INFO 172.17.42.1 - - [11/May/2016:11:29:45
>>> +0000]
>>> > "DELETE /connectors/kafka-sink-connector HTTP/1.1" 204 -  1171
>>> > (org.apache.kafka.connect.runtime.rest.RestServer:60)
>>> > [2016-05-11 11:29:52,034] INFO Forcing shutdown of thread
>>> > WorkerSinkTask-kafka-sink-connector-1
>>> > (org.apache.kafka.connect.util.ShutdownableThread:141)
>>> > [2016-05-11 11:29:52,050] ERROR Graceful stop of task
>>> > org.apache.kafka.connect.runtime.WorkerSinkTask@7761a73c failed.
>>> > (org.apache.kafka.connect.runtime.Worker:312)
>>> > [2016-05-11 11:29:52,051] ERROR Uncaught exception in herder work
>>> thread,
>>> > exiting:
>>> >  (org.apache.kafka.connect.runtime.distributed.DistributedHerder:166)
>>> > java.util.ConcurrentModificationException: KafkaConsumer is not safe
>>> for
>>> > multi-threaded access
>>> > at
>>> >
>>> >
>>> org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1282)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:1213)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.WorkerSinkTask.close(WorkerSinkTask.java:128)
>>> > at org.apache.kafka.connect.runtime.Worker.stopTask(Worker.java:313)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.distributed.DistributedHerder$14.onRevoked(DistributedHerder.java:898)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.onJoinPrepare(WorkerCoordinator.java:238)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:209)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.distributed.WorkerGroupMember.ensureActive(WorkerGroupMember.java:132)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:182)
>>> > at
>>> >
>>> >
>>> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:159)
>>> > at java.lang.Thread.run(Thread.java:745)
>>> >
>>> > On the successive restart, 2 out of 6 tasks were not receiving messages
>>> > anymore
>>> >
>>> > 2016-05-11 11:41 GMT+02:00 Matteo Luzzi <matteo.lu...@gmail.com>:
>>> >
>>> > > Hi Liquan,
>>> > > thanks for the fast response.
>>> > > I'm able to reproduce the error by having two workers running on two
>>> > > different machines. If I restart one of the two worker, the failover
>>> > logic
>>> > > correctly detects the failure and shut down the tasks on the healthy
>>> > worker
>>> > > for rebalancing. When the failed worker is up again the tasks are
>>> > > distributed correctly among the two workers but some tasks don't get
>>> new
>>> > > messages anymore. How can I check that actually all the input topic
>>> > > partitions are correctly reassigned?
>>> > >
>>> > > Matteo
>>> > >
>>> > > 2016-05-11 10:44 GMT+02:00 Liquan Pei <liquan...@gmail.com>:
>>> > >
>>> > >> Hi Matteo,
>>> > >>
>>> > >> Glad to hear that you are building a connector. To better
>>> understand the
>>> > >> issue, can you provide the exact steps to re-produce the issue? One
>>> > thing
>>> > >> I
>>> > >> am confused is that when one worker is shutdown, you don't need to
>>> > restart
>>> > >> the connector through the rest API, the failover logic should
>>> handle the
>>> > >> connector and tasks shutdown and start up.
>>> > >>
>>> > >> The offset storage topic is used for storing offset for source
>>> > connectors.
>>> > >> For sink connector, the offset is simply Kafka offset and will be
>>> stored
>>> > >> in
>>> > >> the __consumer_offset topic.
>>> > >>
>>> > >> Thanks,
>>> > >> Liquan
>>> > >>
>>> > >> On Wed, May 11, 2016 at 1:31 AM, Matteo Luzzi <
>>> matteo.lu...@gmail.com>
>>> > >> wrote:
>>> > >>
>>> > >> > Hi,
>>> > >> > I'm working on a custom implementation of a sink connector for
>>> Kafka
>>> > >> > Connect framework. I'm testing the connector for fault tolerance
>>> by
>>> > >> killing
>>> > >> > the worker process  and restarting the connector through the Rest
>>> API
>>> > >> and
>>> > >> > occasionally I notice that some tasks don't receive anymore
>>> messages
>>> > >> from
>>> > >> > the internal consumers. I don't get any errors from the log and
>>> the
>>> > >> tasks
>>> > >> > seem to be initialised correctly but some of them just don't
>>> process
>>> > >> > messages anymore. Normally when I restart again the connector, the
>>> > tasks
>>> > >> > read all the messages skipped before. I'm executing Kafka Connect
>>> in
>>> > >> > distributed mode.
>>> > >> >
>>> > >> > Could it be a problem of the cleanup function invoked when
>>> closing the
>>> > >> > connector causing a leak in consumer connections with the broker?
>>> Any
>>> > >> > ideas?
>>> > >> >
>>> > >> > And also, from the documentation I read that the connector save
>>> the
>>> > >> offset
>>> > >> > of the tasks in a special topic in Kafka (the one specified via
>>> > >> > offset.storage.topic) but it is empty even though the connector
>>> > process
>>> > >> > messages. Is it normal?
>>> > >> >
>>> > >> > Thanks,
>>> > >> > Matteo
>>> > >> >
>>> > >>
>>> > >>
>>> > >>
>>> > >> --
>>> > >> Liquan Pei
>>> > >> Software Engineer, Confluent Inc
>>> > >>
>>> > >
>>> > >
>>> > >
>>> > > --
>>> > > Matteo Remo Luzzi
>>> > >
>>> >
>>> >
>>> >
>>> > --
>>> > Matteo Remo Luzzi
>>> >
>>>
>>>
>>>
>>> --
>>> Liquan Pei
>>> Software Engineer, Confluent Inc
>>>
>>
>>
>>
>> --
>> Matteo Remo Luzzi
>>
>
>
>
> --
> Matteo Remo Luzzi
>



-- 
Matteo Remo Luzzi

Reply via email to