Thank you, Dhawan!
You have basically validated our approach as well....
We have been using our own "connector"-like service to batch events from Kafka 
and index into ElasticSearch 
(https://github.com/BigDataDevs/kafka-elasticsearch-consumer)
for awhile now. And the main reason we were doing this instead of using 
connectors or bigger frameworks like Logstash was because we wanted to have a a 
direct control over which exceptions we treat as "recoverable" and which as 
non-recoverable.

I was monitoring Connectors space recently and was hoping it might be time to 
give them a try, but looks like they do not offer the same level of direct 
control over the exception handling yet (which is normal for more high-level 
products, of course - the higher level of abstraction you use the less control 
over details you have ...)

Thank you!
Marina

Sent with [ProtonMail](https://protonmail.com) Secure Email.

> -------- Original Message --------
> Subject: Re: Kafka FileStreamSinkConnector handling of bad messages
> Local Time: October 18, 2017 5:36 PM
> UTC Time: October 18, 2017 9:36 PM
> From: dhawan.gajend...@datavisor.com
> To: users@kafka.apache.org, Marina Popova <ppin...@protonmail.com>
>
> Hi Marina,
>
> We hit a similar problem with our S3 connectors. We added a level of 
> indirection, a JSON validating microservice, before publishing to the Kafka 
> topic. The microservice published non-JSON formatted messages to a separate 
> Kafka topic called error-jsons and we flushed those messages using a custom 
> consumer which handles all messages as binaries. Hope that helps.
>
> Thanks,
> Dhawan
>
> On Wed, Oct 18, 2017 at 10:39 AM, Marina Popova <ppin...@protonmail.com> 
> wrote:
>
>> Hi,
>> I wanted to give this question a second try.... as I feel it is very 
>> important to understand how to control error cases with Connectors.
>> Any advice on how to control handling of "poison" messages in case of 
>> connectors?
>>
>> Thanks!
>> Marina
>>
>>> Hi,
>>> I have the FileStreamSinkConnector working perfectly fine in a distributed 
>>> mode when only good messages are being sent to the input event topic.
>>>
>>> However, if I send a message that is bad - for example, not in a correct 
>>> JSON format, and I am using the Json converter for keys/values as following:
>>> key.converter=org.apache.kafka.connect.json.JsonConverter
>>> value.converter=org.apache.kafka.connect.json.JsonConverter
>>>
>>> then the Task that handles this message fails (which is expected) and gets 
>>> into the FAILED status. (and if I have more than one task, they all 
>>> eventually get killed since they all are getting this bad message to 
>>> reprocess once the rebalance happens....)
>>>
>>> So, I tried to restart the failed task as following:
>>>  curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
>>> and it does restart , but it immediately gets the same bad message and 
>>> kills itself again.....
>>>
>>> Looks like the offset for the bad message never gets committed and Kafka 
>>> consumer (the task) keeps reading it and killing itself forever....
>>>
>>> Is there a way to start the Task from the LATEST offset, or somehow 
>>> instruct the FileStreamSinkConnector to skip bad messages?
>>>
>>> I found this older post that describes exactly my situation:
>>> https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
>>> - and the suggestion was to add handling of these errors into the Connector 
>>> code..... however, I am using the pre-packaged file sink connector and 
>>> would rather not modify its code....
>>>
>>> Thanks!!
>>> Marina
>>>
>>> Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to