Hi,
I have the FileStreamSinkConnector working perfectly fine in a distributed mode 
when only good messages are being sent to the input event topic.

However, if I send a message that is bad - for example, not in a correct JSON 
format, and I am using the Json converter for keys/values as following:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

then the Task that handles this message fails (which is expected) and gets into 
the FAILED status. (and if I have more than one task, they all eventually get 
killed since they all are getting this bad message to reprocess once the 
rebalance happens....)

So, I tried to restart the failed task as following:
 curl -X POST localhost:8083/connectors/local-file-sink/tasks/0/restart
and it does restart , but it immediately gets the same bad message and kills 
itself again.....

Looks like the offset for the bad message never gets committed and Kafka 
consumer (the task) keeps reading it and killing itself forever....

Is there a way to start the Task from the LATEST offset, or somehow instruct 
the FileStreamSinkConnector to skip bad messages?

I found this older post that describes exactly my situation:
https://groups.google.com/forum/#!topic/confluent-platform/hkyDslGym9g
- and the suggestion was to add handling of these errors into the Connector 
code..... however, I am using the pre-packaged file sink connector and would 
rather not modify its code....

Thanks!!
Marina

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Reply via email to