Responses inline. On Mon, Mar 19, 2018 at 3:02 PM, Matt Farmer <m...@frmr.me> wrote:
> Hi everyone, > > We’ve been experimenting recently with some limited use of Kafka Connect > and are hoping to expand to wider use cases soon. However, we had some > internal issues that gave us a well-timed preview of error handling > behavior in Kafka Connect. I think the fixes for this will require at least > three different KIPs, but I want to share some thoughts to get the initial > reaction from folks in the dev community. If these ideas seem reasonable, I > can go ahead and create the required KIPs. > > Here are the three things specifically we ran into… > > ----------- > > (1) Kafka Connect only retries tasks when certain exceptions are thrown > Currently, Kafka Connect only retries tasks when certain exceptions are > thrown - I believe the logic checks to see if the exception is specifically > marked as “retryable” and if not, fails. We’d like to bypass this behavior > and implement a configurable exponential backoff for tasks regardless of > the failure reason. This is probably two changes: one to implement > exponential backoff retries for tasks if they don’t already exist and a > chance to implement a RetryPolicy interface that evaluates the Exception to > determine whether or not to retry. > This has definitely come up before. The likely "fix" for this is to provide general "bad data handling" options within the framework itself. The obvious set would be 1. fail fast, which is what we do today (assuming connector actually fails and doesn't eat errors) 2. retry (possibly with configs to limit) 3. drop data and move on 4. dead letter queue This needs to be addressed in a way that handles errors from: 1. The connector itself (e.g. connectivity issues to the other system) 2. Converters/serializers (bad data, unexpected format, etc) 3. SMTs 4. Ideally the fmwk as well (though I don't think we have any known bugs where this would be a problem, and we'd be inclined to just fix them anyway). I think we understand the space of problems and how to address them pretty well already, this issue is really just a matter of someone finding the time to KIP, implement, and review/implement. (And that review/commit one realistically means we need multiple people's time). Happy to guide anyone interested on next steps. If not addressed by general community, Confluent will get to this at some point, but I couldn't say when that would be -- Randall might know better than I would. > (2) Kafka Connect doesn’t permit Connectors to smartly reposition after > rebalance > We’re using the S3 connector to dump files with a large number of records > into an S3 bucket. About 100,000 records per file. Unfortunately, every > time a task fails, the consumer rebalance causes all partitions to get > re-shuffled amongst the various partitions. To compensate for this, the > connector gets stopped and started from what I can tell from the logs? And > then picks up from the last consumer position that was committed to the > brokers. > > This doesn’t work great if you’re batching things into large numbers for > archival. > > For the S3 connector, for example: Let’s say I have two partitions and the > connector has two tasks to process each of those. Task 0 is at 5,000 > records read from the last commit and Task 1 is at 70,000 records read from > the last commit. Then, boom, something goes wrong with Task 0 and it falls > over. This triggers a rebalance and Task 1 has to take over the workload. > Task 1 will, at this point, discard the 70,000 records in its buffer and > start from the last commit point. This failure mode is brutal for the > archival system we’re building. > > Yes, this is a known pain point. Usually it shows up as more of an issue for running a lot of connectors (where you don't want a tasks failure to unnecessarily affect unrelated work), but the concern for connectors which do relatively infrequent commits is valid as well. I'll make a point on the first solution then see below for more complete answer. > There are two solutions that I can think of to this: > > (A) Provide an interface for connectors to define their own rebalance > listener. This listener could compare the newly assigned list of partitions > with a previously assigned list. For all partitions that this connector was > already working on prior to the rebalance, it could manually seek to the > last position it locally processed before resuming. So, in the scenario > above Task 1 could keep an accounting file locally and seek over the first > 70,000 records without reprocessing them. It would then wait until after it > confirms the S3 upload to commit those offsets back to Kafka. This ensures > that if the machine running Task 1 dies a new consumer can take its place, > but we’ll still benefit from a local cache if one is present. > For sink tasks, this actually already exists -- see http://kafka.apache.org/10/javadoc/org/apache/kafka/connect/sink/SinkTask.html#open-java.util.Collection- (open() and close() are the relevant APIs in there). It's not necessarily meant for this purpose originally, but you could take advantage to benefit from any coincidental overlap in previous and new partition sets. > > (B) Have connect manually round robin partitions on a topic to tasks and > never rebalance them automatically. If this were combined with better task > retry semantics, I think this solution would be simpler. > The real answer here is to get rid of the global rebalance and/or try to achieve some level of stickiness for tasks. The global rebalance was something we knew could become a scalability issue and when we extended the group protocol to support connect in addition to consumers (or really any new layer you want to add), we specifically made sure we had a plan to extend and upgrade that protocol, and that we'd be able to implement it such that you could do that upgrade with no downtime. I've had proposals for doing this floating around my head since the first version of Connect, and I've got a draft of some options that I wrote up internally. This particular issue is actually a lot more general than just for Connect. It makes things more complicated (a good reason to be conservative in adding a solution!), but Kafka Streams for nodes that have large state stores can also benefit from a better rebalance protocol, as can any consumer that has to manage some significant local state. The good news is that the implementations here that get you the biggest bang for your implementation-cost buck aren't that complicated. The bad news is that we have to do it separately for the normal consumer and Connect. Again, this mostly boils down to finding time to go KIP and implement, but the basic ideas for partial, sticky, incremental, and deferred rebalancing and their implications are pretty well understood by the core team of Connect developers now. Let me see if I can get some copies of some wikis that got written up internally at Confluent ported to the Kafka wiki. No real reason we haven't put it there, just happened to create it internally and didn't think to copy it over. Probably both would be useful if anyone in the community wants to tackle these problems. > > (3) As far as I can tell, JMX metrics aren’t reporting the number of > active tasks > This one is arguably the simplest issue to resolve, but we’d like to alert > if the number of active tasks isn’t what we expect it to be so that we can > have a human investigate. > Interesting. I think you're right that we're probably just reporting *assigned* tasks with the task-count metric rather than active tasks. I think an active tasks metric would be reasonable, though since you really need to look at the aggregate across workers, I'm not sure it's the best for alerting. Maybe an unhealthy/dead tasks count metric would be better? You can alert on that directly without having to aggregate across workers. -Ewen > > ----------- > > I would love thoughts on all of the above from anyone on this list. > > Thanks, > > Matt Farmer