Thank you, Dhawan! You have basically validated our approach as well.... We have been using our own "connector"-like service to batch events from Kafka and index into ElasticSearch (https://github.com/BigDataDevs/kafka-elasticsearch-consumer) for awhile now. And the main reason we were doing this instead of using connectors or bigger frameworks like Logstash was because we wanted to have a a direct control over which exceptions we treat as "recoverable" and which as non-recoverable.
I was monitoring Connectors space recently and was hoping it might be time to give them a try, but looks like they do not offer the same level of direct control over the exception handling yet (which is normal for more high-level products, of course - the higher level of abstraction you use the less control over details you have ...) Thank you! Marina Sent with [ProtonMail](https://protonmail.com) Secure Email. > -------- Original Message -------- > Subject: Re: Kafka FileStreamSinkConnector handling of bad messages > Local Time: October 18, 2017 5:36 PM > UTC Time: October 18, 2017 9:36 PM > From: dhawan.gajend...@datavisor.com > To: users@kafka.apache.org, Marina Popova <ppin...@protonmail.com> > > Hi Marina, > > We hit a similar problem with our S3 connectors. We added a level of > indirection, a JSON validating microservice, before publishing to the Kafka > topic. The microservice published non-JSON formatted messages to a separate > Kafka topic called error-jsons and we flushed those messages using a custom > consumer which handles all messages as binaries. Hope that helps. > > Thanks, > Dhawan > > On Wed, Oct 18, 2017 at 10:39 AM, Marina Popova <ppin...@protonmail.com> > wrote: > >> Hi, >> I wanted to give this question a second try.... as I feel it is very >> important to understand how to control error cases with Connectors. >> Any advice on how to control handling of "poison" messages in case of >> connectors? >> >> Thanks! >> Marina >> >>> Hi, >>> I have the FileStreamSinkConnector working perfectly fine in a distributed >>> mode when only good messages are being sent to the input event topic. >>> >>> However, if I send a message that is bad - for example, not in a correct >>> JSON format, and I am using the Json converter for keys/values as following: >>> key.converter=org.apache.kafka.connect.json.JsonConverter >>> value.converter=org.apache.kafka.connect.json.JsonConverter >>> >>> then the Task that handles this message fails (which is expected) and gets >>> into the FAILED status. (and if I have more than one task, they all >>> eventually get killed since they all are getting this bad message to >>> reprocess once the rebalance happens....) >>> >>> So, I tried to restart the failed task as following: >>> curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart >>> and it does restart , but it immediately gets the same bad message and >>> kills itself again..... >>> >>> Looks like the offset for the bad message never gets committed and Kafka >>> consumer (the task) keeps reading it and killing itself forever.... >>> >>> Is there a way to start the Task from the LATEST offset, or somehow >>> instruct the FileStreamSinkConnector to skip bad messages? >>> >>> I found this older post that describes exactly my situation: >>> https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g >>> - and the suggestion was to add handling of these errors into the Connector >>> code..... however, I am using the pre-packaged file sink connector and >>> would rather not modify its code.... >>> >>> Thanks!! >>> Marina >>> >>> Sent with [ProtonMail](https://protonmail.com) Secure Email.