Hey Chris! Nice work on this KIP!
What are the thoughts about letting the connector developer control the boundaries of the transaction? For example kafka-connect-spooldir is used to parse and import files to kafka. It would be amazing if I could begin and end the transaction as I open and close files. This would allow me to guard from the dreaded exception a line 732 and wrap an entire file in a transaction. It either makes it or it doesn't. J On Tue, May 18, 2021 at 10:04 AM Chris Egerton <chr...@confluent.io.invalid> wrote: > > Hi Randall, > > Thanks for clarifying the issues with large transactions. I think we're > starting to converge on an understanding of the problem space, if not quite > on an approach to tackle it. > > I think my biggest issue with defining transaction boundaries on a > per-task-poll basis are these points that you make: > > > First, it reuses something that connector implementations > already employ, and which many connectors have configs to tune the behavior > to maximize throughput. Second, it requires no additional configuration > properties or connector-specific behavior. Third, high-throughput > connectors are likely returning large batches, so write amplification for > the extra offset record per source partition is likely to have a much > smaller impact (assuming that most high throughput connectors have high > records-per-source-partition ratios). > > I don't believe the first and third points are safe assumptions to make. > There's actually very little, if any, performance benefit to writing source > connectors whose tasks give Connect larger batches (at least, not as far as > the framework logic goes). Every record is sequentially transformed, > converted, and dispatched to the producer, regardless of whether it came > from a batch of one or one million. So Connect does have the capability > right now to support high-throughput, small-batch connectors. > > For a concrete example, Confluent's Datagen connector ( > https://github.com/confluentinc/kafka-connect-datagen), which is used to > produce sample data for quickstarts and demos, performs just fine even > though it does absolutely no batching whatsoever and returns only a single > record per call to "SourceTask::Poll". In some not-super-rigorous > performance testing, I ran the connector three times against a local build > of the 2.8.0 release of Connect, then modified the 2.8.0 release of Connect > to perform a producer flush for every task-provided batch of records, then > ran the connector three times against that. Each test run produced exactly > one million records. The worst run out of the initial cases (against the > unmodified local 2.8.0 build) took 15 seconds to complete. The best run out > of the subsequent cases (against the modified local 2.8.0 build) took 2 > minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was > against a single local broker with no replication factor. > > Of course, if it's a matter of accommodating this one (demo-oriented) > connector, then it might still be worth it to put the onus on the > developers of that connector to modify it appropriately to work with > exactly-once support. But my point with this connector is more > general--simply put, there don't appear to be safe grounds for the > assumption that source tasks must produce large batches in order to achieve > high throughput. > > > Yes, per-connector offset commit intervals is one approach that would be > more explicit, though see my concerns earlier in this email about > controlling the number of records in a transaction by a time-based config, > even if set on a per-connector basis. > > I'll try to summarize the concerns presented; let me know if I'm missing > something: > > 1. Large transaction sizes can inflate the memory requirements for > consumers and possibly even overwhelm them with out-of-memory errors. > 2. High offset commit intervals can inflate the read latency of downstream > applications. > > Combining both of these concerns with the high-throughput-small-batch > scenario, it still seems worthwhile to provide users with a way to do some > multi-batch transactions for their source tasks. This is analogous to > consumer-side buffering; yes, by default you probably want events to be > available downstream as soon as possible, but in some cases it's still > necessary to introduce a small hit in latency in order to keep up with high > throughput. And if each batch is relatively small, then heightened memory > requirements for consumers shouldn't really be a problem. > > And for other scenarios, with per-connector offset commit intervals, users > can always just set the interval to zero to get exactly the behavior that > you're describing. > > > I'm not sure that setting up a separate Connect cluster is that practical > for many Connect users, especially in larger organizations where one group > manages the cluster and others manage connectors. > > I agree; I was mostly responding to what I perceived as the assumption that > exactly-once support could be configured on a per-connector basis, which > hopefully we've clarified by now is not the case. > > > Reducing throughput is a technical solution, especially if the only > alternative is that the task doesn't run. But IMO it's an impractical > technical solution as most users will want a real-time solution, and > decreasing connector's throughput is the opposite of this. > > Are you proposing that this option not be provided to users? I'm afraid if > we don't do that then it'll just become an undocumented workaround that a > handful of people are aware of (or, worse yet, have to discover for > themselves), but most people aren't, even if they would benefit from > hearing it. I don't think we lose out on much by providing this as a > temporary solution to people who are in a pinch and just need to get data > flowing again, especially if they don't have the capability to run their > connector without exactly-once support. > > > The KIP says: "It may take longer than the transaction timeout for a task > to flush all of its records to Kafka." Might another option be to reduce > the number of records in the transaction? > Is it possible to eliminate transaction timeouts by reducing the size of > the transactions? > > Yes, that is an option. This can be accomplished either by reducing the > offset commit interval (already noted in the KIP) or, if the offset commit > interval is already zero, reconfiguring the connector to produce smaller > batches. > > > > Considering the points that have been raised about transaction boundaries, > I wonder if we can agree on enabling per-poll offset boundaries by default > but providing users with the option to override this. This way, we get the > lowest-latency, lowest-memory-footprint settings out of the box, but we > don't force connector developers to switch to a batching model that has not > been implicitly or explicitly required by Connect for high throughput, in > order to play nicely with exactly-once support. > > Cheers, > > Chris > > On Mon, May 17, 2021 at 7:24 PM Randall Hauch <rha...@gmail.com> wrote: > > > On Mon, May 10, 2021 at 10:26 AM Chris Egerton <chr...@confluent.io.invalid > > > > > wrote: > > > > > RE transaction boundaries: > > > > > > > First, the offset commit interval is currently 60 seconds by default, > > and > > > source connectors can produce a lot of records in that time. Until > > someone > > > operating a Connect cluster changed that to a much lower value, it's > > > possible that source connector performance could be significantly > > impacted. > > > ... > > > > At first blush, > > > controlling transaction boundaries based upon batches seems like it would > > > work better for high throughput connectors. > > > > > > Sorry, what's the concern about performance here? Write throughput for a > > > transactional producer doesn't suffer as the number of records per > > > transaction increases. I'm also worried about source connectors that > > return > > > small batches with high frequency; those would not fare well with a lower > > > offset commit interval. > > > > > > > Recall that with Kafka transactions, a consumer in read_committed mode > > buffers messages that are part of a transaction until that transaction is > > committed or aborted. When a consumer sees an aborted transaction marker > > for that transaction, the consumer discards all buffered messages > > associated with that transaction. When a consumer sees a commit transaction > > marker, it then forwards those buffered messages that are associated with > > that transaction to its application. Note the transaction markers and > > buffered messages are per topic partition. > > > > So I have two concerns with using time-based transaction boundaries. > > > > The first is easier to understand: consumers using read_committed mode may > > consume a lot of memory while buffering records that are part of active > > transactions. In fact, the memory required is a function of the number of > > records in the transaction in the assigned topic partitions. However, the > > only way a Connect user can control the number of records in each > > transaction is by reducing the offset commit interval used for _all > > connectors_. This means that the Connect user cannot directly limit the > > size of the transaction (and therefore the buffering memory required by the > > consumers) but can only limit the maximum duration of the transactions. > > This seems challenging at best, because the size of the transaction is a > > function of the throughput and the offset commit interval. > > > > My second concern is that applications consuming the topics to which a > > source connector writes will perceive those writes as having a potentially > > very high lag w/r/t the connector first seeing in the source system the > > information for those records. The best case is that the source connector > > discovers some information, generates a source record and hands it to > > Connect, and Connect writes that record to the topic just before the offset > > commit window closes and commits the transaction. A worse case is that the > > source connector gives the record to Connect just after the offset commit > > window closes (and the transaction is closed), and the transaction with > > that record will not be committed for another commit interval. This means > > the worst case *perceived lag* could be at least the offset commit > > interval. > > > > So if we agree that it will be difficult for some Connect users to choose a > > one-size-fits-all offset commit interval to work well for source connectors > > with a range of throughputs and different consumer application > > requirements, we may need to consider the ability for each connector to > > control the transaction boundaries, albeit still somewhat indirectly. The > > question then becomes how to specify the boundaries. Record counts alone > > are insufficient, since a transaction could be 1 record short for some > > time, resulting in the entire transaction timing out. Time/duration is also > > not sufficient, since for low perceived lag this should be set as small as > > is feasible (and thus poor for high throughput). > > > > Using a separate transaction for each batch seems like a very worthwhile > > compromise. First, it reuses something that connector implementations > > already employ, and which many connectors have configs to tune the behavior > > to maximize throughput. Second, it requires no additional configuration > > properties or connector-specific behavior. Third, high-throughput > > connectors are likely returning large batches, so write amplification for > > the extra offset record per source partition is likely to have a much > > smaller impact (assuming that most high throughput connectors have high > > records-per-source-partition ratios). Likewise, low-throughput connectors > > will either have very small or infrequent batches that will easily handle > > the higher write amplification (up to 2x: one offset record for every > > record). Regardless of the throughput, infrequent batches would have higher > > lag anyway even without EOS, and EOS transactions coupled to those batches > > would add minimum overhead. > > > > > > > > > > > It's true we already have > > > that constraint now, but transactions and more importantly transaction > > size > > > and latency add a completely different aspect to the calculation of how > > to > > > set the offset commit interval for a Connect cluster. > > > ... > > > > If we're committing transactions every 60 seconds (or even as > > frequently > > > as > > > every 5 seconds), then the _perceived lag_ will be significantly higher > > > with transactions than without. > > > > > > That's a fair point. High-throughput connectors are likely to benefit > > from > > > higher commit intervals so that they have to pause to commit offsets, and > > > produce offset records, less frequently. Low-latency connectors are > > likely > > > to benefit from lower commit intervals in order to shorten the time > > between > > > source records being produced to Kafka and their transactions being > > > committed. There's no reason to assume that there's a one-size-fits-all > > > offset commit interval for an entire cluster once offset commit becomes a > > > requirement for records to be visible to downstream consumers. > > > > > > > Have you considered having the worker create a separate transaction for > > > each batch of records returned by the connector? > > > > > > I had not considered this, but for reasons outlined above regarding > > > performance with high-throughput connectors, I'm hesitant to put it into > > > play by default. > > > > > > Given the conflicting requirements of high-throughput and low-latency > > > connectors with regards to offset commit, I do agree that for some > > > connectors, the most useful default behavior would be to commit offsets > > as > > > soon as possible, and committing once for every batch returned from > > > "SourceTask::poll" is a great way to allow that. > > > > > > The most straightforward way to accommodate these conflicting cases seems > > > to be to just go ahead and allow per-connector offset commit intervals. > > > That way, the user gets to define an approximate upper bound on the time > > > between a record being produced and its transaction being committed--and > > if > > > they want the upper bound to be zero, they can just configure their > > > connector with an offset commit interval of zero. Thoughts? > > > > > > > Yes, per-connector offset commit intervals is one approach that would be > > more explicit, though see my concerns earlier in this email about > > controlling the number of records in a transaction by a time-based config, > > even if set on a per-connector basis. > > > > > > > > > > > It is true that low-throughput > > > connectors would result in higher write amplification (worst case being > > 2x > > > when each batch returns a single record), but for low-throughput > > connectors > > > this seems like this might be an acceptable tradeoff if EOS is really > > > needed, or if not then EOS can be disabled for this connector. > > > > > > Just to be clear--disabling exactly-once support for a connector is > > > technically an option but would require setting up a dedicated > > > "non-exactly-once" Connect cluster, since exactly-once will only be > > > configurable on a per-worker level and not a per-connector level. > > > > > > > I'm not sure that setting up a separate Connect cluster is that practical > > for many Connect users, especially in larger organizations where one group > > manages the cluster and others manage connectors. > > > > > > > RE other comments: > > > > > > > This seems to contradict the earlier quote. Specifically, the first > > quote > > > above states that transaction boundaries are dictated by offset flushes, > > > which is controlled by the `offset.flush.interval.ms` property. However, > > > the second quote says the `offset.flush.timeout.ms` property will be > > > ignored for EOS source tasks. I must be missing something. > > > > > > One of these discusses the offset commit interval property ( > > > > > > > > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.interval.ms > > > ), > > > and the other discusses the offset commit timeout property ( > > > > > > > > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms > > > ). > > > They are separate properties and control separate things; hope this > > helps. > > > > > > > > Okay, that makes sense. But boy is that subtle. > > > > > > > > Does it seem realistic or practical to suggest to reduce the source > > > connector's throughput? > > > > > > I believe so, yes. Obviously this option should be reserved for last, but > > > since the penalty for failure here is that the task is rendered > > completely > > > unusable, a slower task seems preferable to a failed one. I've updated > > this > > > section to both add two more alternatives (of tuning the producer for > > > higher throughput and using a smaller offset commit interval) and to > > > reorder things in descending order of preference. > > > > > > > Reducing throughput is a technical solution, especially if the only > > alternative is that the task doesn't run. But IMO it's an impractical > > technical solution as most users will want a real-time solution, and > > decreasing connector's throughput is the opposite of this. > > > > The KIP says: "It may take longer than the transaction timeout for a task > > to flush all of its records to Kafka." Might another option be to reduce > > the number of records in the transaction? > > Is it possible to eliminate transaction timeouts by reducing the size of > > the transactions? > > > > > > > > I should also make it clear that this should occur very rarely in a setup > > > with a reasonably-high transaction timeout. Today, if a task's producer > > is > > > unable to keep up with the throughput of the records generated by the > > task, > > > this leads to a situation where it becomes impossible to commit offsets > > for > > > that task (due to https://issues.apache.org/jira/browse/KAFKA-12226); in > > > practice, I have never seen this happen with a healthy Kafka cluster and > > a > > > sufficiently-provisioned (enough tasks, uncrowded workers, etc.) > > connector. > > > > > > > Do you mean to use "task A" and "task B" here? Do they imply tasks from > > > the > > > same connector? > > > > > > The scenario outlined here applies for both cases: the tasks could be > > from > > > the same connector, or from different connectors. > > > > > > > If so, then won't the offsets from both of those tasks > > > still be written to the same connector-specific offset topic, and suffer > > > from the potential blocking issue mentioned above? While it's useful to > > > call this out, I'm not sure how that example helps support the motivation > > > for a separate per-connector offset topic. OTOH, if these were tasks from > > > _different_ connectors, then it becomes clear that the offsets from one > > > source connector using a connector-specific offsets topic will never > > block > > > the offsets from another connector using a different connector-specific > > > offsets topic. Thus, having each connector use separate > > connector-specific > > > offset topics at least avoids the problem of one connector's tasks > > blocking > > > the tasks of the other connector just because of transaction commit > > timeout > > > issues. > > > > > > I believe this is already touched on in the third paragraph of the > > > "Per-connector offsets topics"/"Motivation" section: > > > > > > > This way, although tasks of the same connector may still interfere with > > > each other, they will at least not interfere with tasks of other > > > connectors. > > > > > > > > It's implied but not explicitly stated that this will result in > > > duplicating > > > the offsets for EOS source connectors: the workers will continue to track > > > all source offsets in its own offsets topic, and EOS-enabled source > > > connectors will also track their own offsets in connector-specific > > offsets > > > topics. It might be worth making that more obvious, and which will be > > used > > > upon connector restarts. > > > > > > I believe the duplication is already touched on in the "Offset (and > > record) > > > writes" section: > > > > > > > Once an offset commit is complete, if the connector is (implicitly or > > > explicitly) configured with a separate offsets topic, the committed > > offsets > > > will also be written to the worker’s global offsets topic using a > > > non-transactional producer and the worker’s principal. > > > > > > And the precedence of the various topics is described in detail in the > > > "Per-connector offsets topics"/"Smooth migration" section (TL;DR: if an > > > offset is present in a connector's separate offsets topic, it's used; if > > > it's not found, then the global offsets topic is searched as a fallback). > > > > > > > Are you suggesting that the source connector and task implementations > > be > > > responsible for this? Or are you suggesting that the worker will be > > > responsible for creating the offsets topics on behalf of source connector > > > and task implementations (using an admin client per the configurations), > > > just like the worker currently does so for the cluster's offsets topic? > > > Please clarify this in the KIP. > > > > > > Sure, done. > > > > > > > Do you think it's worthwhile adding a parenthetical clause after this > > > statement that says "(either `exactly.once.source.enabled=false` or an > > > older Connect worker that does not have EOS support for source > > > connectors)"? Yes, those are called out above, but IMO this would help > > > clarify exactly what this paragraph means for operators -- and how it is > > > not a burden for them to avoid this condition. > > > > > > Sure, done. > > > > > > > In the "Connector principal permissions" section, the "IdempotentWrite" > > > row > > > says "Kafka cluster targeted by the Connect cluster" for the resource > > name > > > column. Is this right? Should that instead be "Kafka cluster to which the > > > Connector writes"? > > > > > > Good catch, done. > > > > > > > In the "Rolling upgrade(s)" section, should the first and second > > rolling > > > upgrades be completed before enabling EOS on any source connector? If so, > > > should this section call this out in more detail? > > > > > > I'm not sure I understand the question? Exactly-once source support is > > only > > > configurable on a per-worker basis and, when enabled, will apply to all > > > connectors. So the steps provided in the "Rolling upgrade(s)" section are > > > sufficient to get every connector on your cluster running with > > exactly-once > > > enabled, and there is no need or way to enable exactly-once on a > > > per-connector basis. > > > > > > > How does this work with a rolling "downgrade"? The "Heterogeneous > > > clusters" > > > subsection of the "Permitted failure scenarios" section talks about (I > > > think) how setting `exactly.once.source.enabled=false` for one worker > > > effectively breaks the EOS delivery guarantees for the whole Connect > > > cluster. If that is the case, how does this rolling downgrade work? Would > > > it simply require that all source connectors configured to use EOS be > > > changed to not use EOS, and then to do a rolling downgrade of the > > worker's > > > configuration to disable EOS? If this is the case, let's be more explicit > > > in this section on downgrading. > > > > In the "Soft downgrade" section, it sounds like this is effectively > > not a > > > rolling downgrade. Nothing in the "Downgrades" section really implies > > that > > > rolling downgrades are not supported, but at the same time it's not > > > terribly obvious. > > > Overall, it might be worth considering (if you haven't already) modifying > > > the Downgrade section to be a bit more prescriptive in terms of steps > > that > > > should be taken. > > > > > > I've touched up the sections that touch on downgrades, to make it clear > > how > > > to safely perform a rolling downgrade for both "hard" and "soft" > > > downgrades, and what the consequences of performing an unsafe rolling > > > "hard" downgrade are and how to deal with the consequences. Hopefully > > this, > > > coupled with the knowledge that exactly-once is not going to be > > > configurable on a per-connector basis, should make things clear. > > > > > > > Finally, do we need to expose any metrics that are related to maybe the > > > average number of records in each transaction? Or should we rely solely > > > upon existing producer metrics? > > > > > > That's a great point--after perusing > > > https://kafka.apache.org/28/documentation.html#producer_monitoring it > > > doesn't look like there are any metrics on producer transaction size. > > I've > > > updated the KIP to add source task metrics for minimum, maximum, and > > > average transaction size. I think that this, in conjunction with existing > > > metrics for poll time, commit time, batch size (per call to > > > "SourceTask::poll"), and active record count, should cover our bases. > > > > > > > > Thanks again! > > > > Randall > >