Hi everyone, We’ve been experimenting recently with some limited use of Kafka Connect and are hoping to expand to wider use cases soon. However, we had some internal issues that gave us a well-timed preview of error handling behavior in Kafka Connect. I think the fixes for this will require at least three different KIPs, but I want to share some thoughts to get the initial reaction from folks in the dev community. If these ideas seem reasonable, I can go ahead and create the required KIPs.
Here are the three things specifically we ran into… ----------- (1) Kafka Connect only retries tasks when certain exceptions are thrown Currently, Kafka Connect only retries tasks when certain exceptions are thrown - I believe the logic checks to see if the exception is specifically marked as “retryable” and if not, fails. We’d like to bypass this behavior and implement a configurable exponential backoff for tasks regardless of the failure reason. This is probably two changes: one to implement exponential backoff retries for tasks if they don’t already exist and a chance to implement a RetryPolicy interface that evaluates the Exception to determine whether or not to retry. (2) Kafka Connect doesn’t permit Connectors to smartly reposition after rebalance We’re using the S3 connector to dump files with a large number of records into an S3 bucket. About 100,000 records per file. Unfortunately, every time a task fails, the consumer rebalance causes all partitions to get re-shuffled amongst the various partitions. To compensate for this, the connector gets stopped and started from what I can tell from the logs? And then picks up from the last consumer position that was committed to the brokers. This doesn’t work great if you’re batching things into large numbers for archival. For the S3 connector, for example: Let’s say I have two partitions and the connector has two tasks to process each of those. Task 0 is at 5,000 records read from the last commit and Task 1 is at 70,000 records read from the last commit. Then, boom, something goes wrong with Task 0 and it falls over. This triggers a rebalance and Task 1 has to take over the workload. Task 1 will, at this point, discard the 70,000 records in its buffer and start from the last commit point. This failure mode is brutal for the archival system we’re building. There are two solutions that I can think of to this: (A) Provide an interface for connectors to define their own rebalance listener. This listener could compare the newly assigned list of partitions with a previously assigned list. For all partitions that this connector was already working on prior to the rebalance, it could manually seek to the last position it locally processed before resuming. So, in the scenario above Task 1 could keep an accounting file locally and seek over the first 70,000 records without reprocessing them. It would then wait until after it confirms the S3 upload to commit those offsets back to Kafka. This ensures that if the machine running Task 1 dies a new consumer can take its place, but we’ll still benefit from a local cache if one is present. (B) Have connect manually round robin partitions on a topic to tasks and never rebalance them automatically. If this were combined with better task retry semantics, I think this solution would be simpler. (3) As far as I can tell, JMX metrics aren’t reporting the number of active tasks This one is arguably the simplest issue to resolve, but we’d like to alert if the number of active tasks isn’t what we expect it to be so that we can have a human investigate. ----------- I would love thoughts on all of the above from anyone on this list. Thanks, Matt Farmer