Hey Jeremy, Thanks for taking a look! Always nice to have input from connector developers, especially ones as prolific as you.
I was hoping to leave connector-defined transaction boundaries for future work as the use cases for them were unclear. For example, with transactions in an upstream data source, if a connector developer wants to preserve those transactions downstream, their connector can produce source records representing those transaction boundaries, and allow users to then perform their own filtering logic based on those records. Alternatively, the connector can do that filtering itself by refusing to produce any records until the upstream transaction from which they are derived is committed. Given that it's pretty much a sure thing at this point that producer transaction commits will not subdivide task-defined batches (where a batch is the collection of records returned in a single call to "SourceTask::put"), I wonder if the Spooldir connector could follow that second approach: buffer all the records for a file and then provide them to Connect all in one batch. And, if this is infeasible for large files, the question then becomes--how would we expect downstream applications to handle this? A large amount of the recent discussion here recently has been centered around the problems posed by large transactions, both in terms of increased latency and heightened memory requirements for consumers (which would have to buffer all the records in that transaction locally until a commit marker is encountered, on a per-topic-partition basis). As far as heightened latency goes, if you need whole files written at once, it seems like a reasonable tradeoff to make that probably won't ruffle a lot of feathers (although your input here to validate that assumption would be valuable). With regards to memory requirements--I guess it's not out of the question to suggest that users who truly need atomic transfer of entire files at a time be equipped with the right hardware to be able to support it. In downstream applications this is unavoidable until/unless broker-side transaction filtering is implemented. However, if we allow connector-defined transaction boundaries, connector tasks wouldn't need the additional memory as they would be able to send records to Connect as they read them, which would respond by dispatching them to a producer on the fly. On the other hand, if tasks have to locally buffer records in order to be able to return them all in a single batch, then we inflict that new, possibly-painful memory requirement on Connect workers as well. Okay, stream of consciousness over. I think I see a valid use case for connector-defined transaction boundaries. That leaves us with the task of designing an API for connector developers, and deciding on how to provide this option to users (some may not need or want to respect the transaction boundaries defined by their connector). As far as the API goes, I think the approach you mentioned offline of using the source task context makes sense for the most part, but would introduce some small cross-compatibility issues where it would become more difficult to run newer connectors on older versions of Connect. Instead, we could take a page out of KIP-610's book and do something like this: public interface SourceTaskContext { // Existing methods and fields omitted in this snippet public TransactionContext transactionContext(); } A new TransactionContext interface would be introduced. Tasks can grab an instance of it on startup from the source task context and use it to communicate transaction boundaries to the worker over the course of their lifetime. If the worker is running an older version of Connect that doesn't support this API, they'll be met with a classloading error during the initial call to "transactionContext()", and will be able to deduce from it that they won't be able to define their own transaction boundaries. The benefit is that, if this is all done on task startup, the classloading error will only have to be caught once, instead of every time any of the transaction-related methods are invoked. For the TransactionContext interface: public interface TransactionContext { public void commitTransaction(); public void commitTransactionOn(SourceRecord record); public void abortTransaction(); public void abortTransactionOn(SourceRecord record); } There would be no API for defining when a transaction begins, since we can assume that transactions begin when the first record is provided and immediately after every committed or aborted transaction. The method variants that accept SourceRecord instances would allow connectors to completely decouple record batches from transaction boundaries; they could produce single record batches with multiple transactions inside them, or define transactions that start in the middle of one batch and end in the middle of another. Tasks can be made aware of when transactions are completed (either committed or aborted) via the "SourceTask::commitRecord" method. And finally, the no-arg method variants would establish a transaction commit/abort after the last record in the next (possibly-empty) record batch to be received from the task. This way, tasks processing data in the middle of a call to "SourceTask::poll" that requires a transaction commit/abort for the batch that they are about to return will be able to simply invoke "context.commitTransaction()" or "context.abortTransaction()", and then return that batch to Connect. Finally, as far as exposing this to users goes--it sounds like there are now three different modes for defining transaction boundaries: 1. Explicitly connector-defined 2. Defined by user-configurable commit interval 3. One transaction for every call to "SourceTask::poll" (implicitly connector-defined) One tricky thing here is that the first option won't be available for every connector, so we can't take the easy way out and just introduce a new connector config property with three different values representing the three different modes--if someone tells Connect to let connectors define their own transaction boundaries and then the connector doesn't commit or abort anything, that'd probably be a sub-par user experience. I'm hoping we can think of something cleaner but my initial instinct to deal with that is to add a new method to the source connector API that will allow connectors to notify Connect whether they define their own transaction boundaries or not: public abstract class SourceConnector extends Connector { // Existing methods omitted from this snippet public boolean canDefineTransactionBoundaries() { return false; } } We can then leverage that method during pre-flight config validation for a connector to prevent users from shooting themselves in the foot by requesting connector-defined transaction boundaries from a connector that doesn't do any of that. If we go that route, then the three-mode connector config property approach could work. Might call it something like "transaction.boundary.definition" with values of "poll", "interval", and "connector". If the value is "interval", then we can also allow users to configure this interval directly using a "transaction.commit.interval.ms" property. We could default to "poll"--which would be the behavior proposed by Randall, to commit a transaction for invocation of "SourceTask::poll". Let me know what you think and I'll update the KIP if we get traction for this. Thanks again for chiming in! Cheers, Chris On Wed, May 19, 2021 at 2:23 PM Jeremy Custenborder <jcustenbor...@gmail.com> wrote: > Hey Chris! > > Nice work on this KIP! > > What are the thoughts about letting the connector developer control > the boundaries of the transaction? For example kafka-connect-spooldir > is used to parse and import files to kafka. It would be amazing if I > could begin and end the transaction as I open and close files. This > would allow me to guard from the dreaded exception a line 732 and wrap > an entire file in a transaction. It either makes it or it doesn't. > > J > > > On Tue, May 18, 2021 at 10:04 AM Chris Egerton > <chr...@confluent.io.invalid> wrote: > > > > Hi Randall, > > > > Thanks for clarifying the issues with large transactions. I think we're > > starting to converge on an understanding of the problem space, if not > quite > > on an approach to tackle it. > > > > I think my biggest issue with defining transaction boundaries on a > > per-task-poll basis are these points that you make: > > > > > First, it reuses something that connector implementations > > already employ, and which many connectors have configs to tune the > behavior > > to maximize throughput. Second, it requires no additional configuration > > properties or connector-specific behavior. Third, high-throughput > > connectors are likely returning large batches, so write amplification for > > the extra offset record per source partition is likely to have a much > > smaller impact (assuming that most high throughput connectors have high > > records-per-source-partition ratios). > > > > I don't believe the first and third points are safe assumptions to make. > > There's actually very little, if any, performance benefit to writing > source > > connectors whose tasks give Connect larger batches (at least, not as far > as > > the framework logic goes). Every record is sequentially transformed, > > converted, and dispatched to the producer, regardless of whether it came > > from a batch of one or one million. So Connect does have the capability > > right now to support high-throughput, small-batch connectors. > > > > For a concrete example, Confluent's Datagen connector ( > > https://github.com/confluentinc/kafka-connect-datagen), which is used to > > produce sample data for quickstarts and demos, performs just fine even > > though it does absolutely no batching whatsoever and returns only a > single > > record per call to "SourceTask::Poll". In some not-super-rigorous > > performance testing, I ran the connector three times against a local > build > > of the 2.8.0 release of Connect, then modified the 2.8.0 release of > Connect > > to perform a producer flush for every task-provided batch of records, > then > > ran the connector three times against that. Each test run produced > exactly > > one million records. The worst run out of the initial cases (against the > > unmodified local 2.8.0 build) took 15 seconds to complete. The best run > out > > of the subsequent cases (against the modified local 2.8.0 build) took 2 > > minutes and 44 seconds, or 164 seconds--over a 10x slowdown. And this was > > against a single local broker with no replication factor. > > > > Of course, if it's a matter of accommodating this one (demo-oriented) > > connector, then it might still be worth it to put the onus on the > > developers of that connector to modify it appropriately to work with > > exactly-once support. But my point with this connector is more > > general--simply put, there don't appear to be safe grounds for the > > assumption that source tasks must produce large batches in order to > achieve > > high throughput. > > > > > Yes, per-connector offset commit intervals is one approach that would > be > > more explicit, though see my concerns earlier in this email about > > controlling the number of records in a transaction by a time-based > config, > > even if set on a per-connector basis. > > > > I'll try to summarize the concerns presented; let me know if I'm missing > > something: > > > > 1. Large transaction sizes can inflate the memory requirements for > > consumers and possibly even overwhelm them with out-of-memory errors. > > 2. High offset commit intervals can inflate the read latency of > downstream > > applications. > > > > Combining both of these concerns with the high-throughput-small-batch > > scenario, it still seems worthwhile to provide users with a way to do > some > > multi-batch transactions for their source tasks. This is analogous to > > consumer-side buffering; yes, by default you probably want events to be > > available downstream as soon as possible, but in some cases it's still > > necessary to introduce a small hit in latency in order to keep up with > high > > throughput. And if each batch is relatively small, then heightened memory > > requirements for consumers shouldn't really be a problem. > > > > And for other scenarios, with per-connector offset commit intervals, > users > > can always just set the interval to zero to get exactly the behavior that > > you're describing. > > > > > I'm not sure that setting up a separate Connect cluster is that > practical > > for many Connect users, especially in larger organizations where one > group > > manages the cluster and others manage connectors. > > > > I agree; I was mostly responding to what I perceived as the assumption > that > > exactly-once support could be configured on a per-connector basis, which > > hopefully we've clarified by now is not the case. > > > > > Reducing throughput is a technical solution, especially if the only > > alternative is that the task doesn't run. But IMO it's an impractical > > technical solution as most users will want a real-time solution, and > > decreasing connector's throughput is the opposite of this. > > > > Are you proposing that this option not be provided to users? I'm afraid > if > > we don't do that then it'll just become an undocumented workaround that a > > handful of people are aware of (or, worse yet, have to discover for > > themselves), but most people aren't, even if they would benefit from > > hearing it. I don't think we lose out on much by providing this as a > > temporary solution to people who are in a pinch and just need to get data > > flowing again, especially if they don't have the capability to run their > > connector without exactly-once support. > > > > > The KIP says: "It may take longer than the transaction timeout for a > task > > to flush all of its records to Kafka." Might another option be to reduce > > the number of records in the transaction? > > Is it possible to eliminate transaction timeouts by reducing the size of > > the transactions? > > > > Yes, that is an option. This can be accomplished either by reducing the > > offset commit interval (already noted in the KIP) or, if the offset > commit > > interval is already zero, reconfiguring the connector to produce smaller > > batches. > > > > > > > > Considering the points that have been raised about transaction > boundaries, > > I wonder if we can agree on enabling per-poll offset boundaries by > default > > but providing users with the option to override this. This way, we get > the > > lowest-latency, lowest-memory-footprint settings out of the box, but we > > don't force connector developers to switch to a batching model that has > not > > been implicitly or explicitly required by Connect for high throughput, in > > order to play nicely with exactly-once support. > > > > Cheers, > > > > Chris > > > > On Mon, May 17, 2021 at 7:24 PM Randall Hauch <rha...@gmail.com> wrote: > > > > > On Mon, May 10, 2021 at 10:26 AM Chris Egerton > <chr...@confluent.io.invalid > > > > > > > wrote: > > > > > > > RE transaction boundaries: > > > > > > > > > First, the offset commit interval is currently 60 seconds by > default, > > > and > > > > source connectors can produce a lot of records in that time. Until > > > someone > > > > operating a Connect cluster changed that to a much lower value, it's > > > > possible that source connector performance could be significantly > > > impacted. > > > > ... > > > > > At first blush, > > > > controlling transaction boundaries based upon batches seems like it > would > > > > work better for high throughput connectors. > > > > > > > > Sorry, what's the concern about performance here? Write throughput > for a > > > > transactional producer doesn't suffer as the number of records per > > > > transaction increases. I'm also worried about source connectors that > > > return > > > > small batches with high frequency; those would not fare well with a > lower > > > > offset commit interval. > > > > > > > > > > Recall that with Kafka transactions, a consumer in read_committed mode > > > buffers messages that are part of a transaction until that transaction > is > > > committed or aborted. When a consumer sees an aborted transaction > marker > > > for that transaction, the consumer discards all buffered messages > > > associated with that transaction. When a consumer sees a commit > transaction > > > marker, it then forwards those buffered messages that are associated > with > > > that transaction to its application. Note the transaction markers and > > > buffered messages are per topic partition. > > > > > > So I have two concerns with using time-based transaction boundaries. > > > > > > The first is easier to understand: consumers using read_committed mode > may > > > consume a lot of memory while buffering records that are part of active > > > transactions. In fact, the memory required is a function of the number > of > > > records in the transaction in the assigned topic partitions. However, > the > > > only way a Connect user can control the number of records in each > > > transaction is by reducing the offset commit interval used for _all > > > connectors_. This means that the Connect user cannot directly limit the > > > size of the transaction (and therefore the buffering memory required > by the > > > consumers) but can only limit the maximum duration of the transactions. > > > This seems challenging at best, because the size of the transaction is > a > > > function of the throughput and the offset commit interval. > > > > > > My second concern is that applications consuming the topics to which a > > > source connector writes will perceive those writes as having a > potentially > > > very high lag w/r/t the connector first seeing in the source system the > > > information for those records. The best case is that the source > connector > > > discovers some information, generates a source record and hands it to > > > Connect, and Connect writes that record to the topic just before the > offset > > > commit window closes and commits the transaction. A worse case is that > the > > > source connector gives the record to Connect just after the offset > commit > > > window closes (and the transaction is closed), and the transaction with > > > that record will not be committed for another commit interval. This > means > > > the worst case *perceived lag* could be at least the offset commit > > > interval. > > > > > > So if we agree that it will be difficult for some Connect users to > choose a > > > one-size-fits-all offset commit interval to work well for source > connectors > > > with a range of throughputs and different consumer application > > > requirements, we may need to consider the ability for each connector to > > > control the transaction boundaries, albeit still somewhat indirectly. > The > > > question then becomes how to specify the boundaries. Record counts > alone > > > are insufficient, since a transaction could be 1 record short for some > > > time, resulting in the entire transaction timing out. Time/duration is > also > > > not sufficient, since for low perceived lag this should be set as > small as > > > is feasible (and thus poor for high throughput). > > > > > > Using a separate transaction for each batch seems like a very > worthwhile > > > compromise. First, it reuses something that connector implementations > > > already employ, and which many connectors have configs to tune the > behavior > > > to maximize throughput. Second, it requires no additional configuration > > > properties or connector-specific behavior. Third, high-throughput > > > connectors are likely returning large batches, so write amplification > for > > > the extra offset record per source partition is likely to have a much > > > smaller impact (assuming that most high throughput connectors have high > > > records-per-source-partition ratios). Likewise, low-throughput > connectors > > > will either have very small or infrequent batches that will easily > handle > > > the higher write amplification (up to 2x: one offset record for every > > > record). Regardless of the throughput, infrequent batches would have > higher > > > lag anyway even without EOS, and EOS transactions coupled to those > batches > > > would add minimum overhead. > > > > > > > > > > > > > > > It's true we already have > > > > that constraint now, but transactions and more importantly > transaction > > > size > > > > and latency add a completely different aspect to the calculation of > how > > > to > > > > set the offset commit interval for a Connect cluster. > > > > ... > > > > > If we're committing transactions every 60 seconds (or even as > > > frequently > > > > as > > > > every 5 seconds), then the _perceived lag_ will be significantly > higher > > > > with transactions than without. > > > > > > > > That's a fair point. High-throughput connectors are likely to benefit > > > from > > > > higher commit intervals so that they have to pause to commit > offsets, and > > > > produce offset records, less frequently. Low-latency connectors are > > > likely > > > > to benefit from lower commit intervals in order to shorten the time > > > between > > > > source records being produced to Kafka and their transactions being > > > > committed. There's no reason to assume that there's a > one-size-fits-all > > > > offset commit interval for an entire cluster once offset commit > becomes a > > > > requirement for records to be visible to downstream consumers. > > > > > > > > > Have you considered having the worker create a separate > transaction for > > > > each batch of records returned by the connector? > > > > > > > > I had not considered this, but for reasons outlined above regarding > > > > performance with high-throughput connectors, I'm hesitant to put it > into > > > > play by default. > > > > > > > > Given the conflicting requirements of high-throughput and low-latency > > > > connectors with regards to offset commit, I do agree that for some > > > > connectors, the most useful default behavior would be to commit > offsets > > > as > > > > soon as possible, and committing once for every batch returned from > > > > "SourceTask::poll" is a great way to allow that. > > > > > > > > The most straightforward way to accommodate these conflicting cases > seems > > > > to be to just go ahead and allow per-connector offset commit > intervals. > > > > That way, the user gets to define an approximate upper bound on the > time > > > > between a record being produced and its transaction being > committed--and > > > if > > > > they want the upper bound to be zero, they can just configure their > > > > connector with an offset commit interval of zero. Thoughts? > > > > > > > > > > Yes, per-connector offset commit intervals is one approach that would > be > > > more explicit, though see my concerns earlier in this email about > > > controlling the number of records in a transaction by a time-based > config, > > > even if set on a per-connector basis. > > > > > > > > > > > > > > > It is true that low-throughput > > > > connectors would result in higher write amplification (worst case > being > > > 2x > > > > when each batch returns a single record), but for low-throughput > > > connectors > > > > this seems like this might be an acceptable tradeoff if EOS is really > > > > needed, or if not then EOS can be disabled for this connector. > > > > > > > > Just to be clear--disabling exactly-once support for a connector is > > > > technically an option but would require setting up a dedicated > > > > "non-exactly-once" Connect cluster, since exactly-once will only be > > > > configurable on a per-worker level and not a per-connector level. > > > > > > > > > > I'm not sure that setting up a separate Connect cluster is that > practical > > > for many Connect users, especially in larger organizations where one > group > > > manages the cluster and others manage connectors. > > > > > > > > > > RE other comments: > > > > > > > > > This seems to contradict the earlier quote. Specifically, the first > > > quote > > > > above states that transaction boundaries are dictated by offset > flushes, > > > > which is controlled by the `offset.flush.interval.ms` property. > However, > > > > the second quote says the `offset.flush.timeout.ms` property will be > > > > ignored for EOS source tasks. I must be missing something. > > > > > > > > One of these discusses the offset commit interval property ( > > > > > > > > > > > > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.interval.ms > > > > ), > > > > and the other discusses the offset commit timeout property ( > > > > > > > > > > > > https://kafka.apache.org/27/documentation.html#connectconfigs_offset.flush.timeout.ms > > > > ). > > > > They are separate properties and control separate things; hope this > > > helps. > > > > > > > > > > > Okay, that makes sense. But boy is that subtle. > > > > > > > > > > > Does it seem realistic or practical to suggest to reduce the source > > > > connector's throughput? > > > > > > > > I believe so, yes. Obviously this option should be reserved for > last, but > > > > since the penalty for failure here is that the task is rendered > > > completely > > > > unusable, a slower task seems preferable to a failed one. I've > updated > > > this > > > > section to both add two more alternatives (of tuning the producer for > > > > higher throughput and using a smaller offset commit interval) and to > > > > reorder things in descending order of preference. > > > > > > > > > > Reducing throughput is a technical solution, especially if the only > > > alternative is that the task doesn't run. But IMO it's an impractical > > > technical solution as most users will want a real-time solution, and > > > decreasing connector's throughput is the opposite of this. > > > > > > The KIP says: "It may take longer than the transaction timeout for a > task > > > to flush all of its records to Kafka." Might another option be to > reduce > > > the number of records in the transaction? > > > Is it possible to eliminate transaction timeouts by reducing the size > of > > > the transactions? > > > > > > > > > > > I should also make it clear that this should occur very rarely in a > setup > > > > with a reasonably-high transaction timeout. Today, if a task's > producer > > > is > > > > unable to keep up with the throughput of the records generated by the > > > task, > > > > this leads to a situation where it becomes impossible to commit > offsets > > > for > > > > that task (due to https://issues.apache.org/jira/browse/KAFKA-12226); > in > > > > practice, I have never seen this happen with a healthy Kafka cluster > and > > > a > > > > sufficiently-provisioned (enough tasks, uncrowded workers, etc.) > > > connector. > > > > > > > > > Do you mean to use "task A" and "task B" here? Do they imply tasks > from > > > > the > > > > same connector? > > > > > > > > The scenario outlined here applies for both cases: the tasks could be > > > from > > > > the same connector, or from different connectors. > > > > > > > > > If so, then won't the offsets from both of those tasks > > > > still be written to the same connector-specific offset topic, and > suffer > > > > from the potential blocking issue mentioned above? While it's useful > to > > > > call this out, I'm not sure how that example helps support the > motivation > > > > for a separate per-connector offset topic. OTOH, if these were tasks > from > > > > _different_ connectors, then it becomes clear that the offsets from > one > > > > source connector using a connector-specific offsets topic will never > > > block > > > > the offsets from another connector using a different > connector-specific > > > > offsets topic. Thus, having each connector use separate > > > connector-specific > > > > offset topics at least avoids the problem of one connector's tasks > > > blocking > > > > the tasks of the other connector just because of transaction commit > > > timeout > > > > issues. > > > > > > > > I believe this is already touched on in the third paragraph of the > > > > "Per-connector offsets topics"/"Motivation" section: > > > > > > > > > This way, although tasks of the same connector may still interfere > with > > > > each other, they will at least not interfere with tasks of other > > > > connectors. > > > > > > > > > > > It's implied but not explicitly stated that this will result in > > > > duplicating > > > > the offsets for EOS source connectors: the workers will continue to > track > > > > all source offsets in its own offsets topic, and EOS-enabled source > > > > connectors will also track their own offsets in connector-specific > > > offsets > > > > topics. It might be worth making that more obvious, and which will be > > > used > > > > upon connector restarts. > > > > > > > > I believe the duplication is already touched on in the "Offset (and > > > record) > > > > writes" section: > > > > > > > > > Once an offset commit is complete, if the connector is (implicitly > or > > > > explicitly) configured with a separate offsets topic, the committed > > > offsets > > > > will also be written to the worker’s global offsets topic using a > > > > non-transactional producer and the worker’s principal. > > > > > > > > And the precedence of the various topics is described in detail in > the > > > > "Per-connector offsets topics"/"Smooth migration" section (TL;DR: if > an > > > > offset is present in a connector's separate offsets topic, it's > used; if > > > > it's not found, then the global offsets topic is searched as a > fallback). > > > > > > > > > Are you suggesting that the source connector and task > implementations > > > be > > > > responsible for this? Or are you suggesting that the worker will be > > > > responsible for creating the offsets topics on behalf of source > connector > > > > and task implementations (using an admin client per the > configurations), > > > > just like the worker currently does so for the cluster's offsets > topic? > > > > Please clarify this in the KIP. > > > > > > > > Sure, done. > > > > > > > > > Do you think it's worthwhile adding a parenthetical clause after > this > > > > statement that says "(either `exactly.once.source.enabled=false` or > an > > > > older Connect worker that does not have EOS support for source > > > > connectors)"? Yes, those are called out above, but IMO this would > help > > > > clarify exactly what this paragraph means for operators -- and how > it is > > > > not a burden for them to avoid this condition. > > > > > > > > Sure, done. > > > > > > > > > In the "Connector principal permissions" section, the > "IdempotentWrite" > > > > row > > > > says "Kafka cluster targeted by the Connect cluster" for the resource > > > name > > > > column. Is this right? Should that instead be "Kafka cluster to > which the > > > > Connector writes"? > > > > > > > > Good catch, done. > > > > > > > > > In the "Rolling upgrade(s)" section, should the first and second > > > rolling > > > > upgrades be completed before enabling EOS on any source connector? > If so, > > > > should this section call this out in more detail? > > > > > > > > I'm not sure I understand the question? Exactly-once source support > is > > > only > > > > configurable on a per-worker basis and, when enabled, will apply to > all > > > > connectors. So the steps provided in the "Rolling upgrade(s)" > section are > > > > sufficient to get every connector on your cluster running with > > > exactly-once > > > > enabled, and there is no need or way to enable exactly-once on a > > > > per-connector basis. > > > > > > > > > How does this work with a rolling "downgrade"? The "Heterogeneous > > > > clusters" > > > > subsection of the "Permitted failure scenarios" section talks about > (I > > > > think) how setting `exactly.once.source.enabled=false` for one worker > > > > effectively breaks the EOS delivery guarantees for the whole Connect > > > > cluster. If that is the case, how does this rolling downgrade work? > Would > > > > it simply require that all source connectors configured to use EOS be > > > > changed to not use EOS, and then to do a rolling downgrade of the > > > worker's > > > > configuration to disable EOS? If this is the case, let's be more > explicit > > > > in this section on downgrading. > > > > > In the "Soft downgrade" section, it sounds like this is effectively > > > not a > > > > rolling downgrade. Nothing in the "Downgrades" section really implies > > > that > > > > rolling downgrades are not supported, but at the same time it's not > > > > terribly obvious. > > > > Overall, it might be worth considering (if you haven't already) > modifying > > > > the Downgrade section to be a bit more prescriptive in terms of steps > > > that > > > > should be taken. > > > > > > > > I've touched up the sections that touch on downgrades, to make it > clear > > > how > > > > to safely perform a rolling downgrade for both "hard" and "soft" > > > > downgrades, and what the consequences of performing an unsafe rolling > > > > "hard" downgrade are and how to deal with the consequences. Hopefully > > > this, > > > > coupled with the knowledge that exactly-once is not going to be > > > > configurable on a per-connector basis, should make things clear. > > > > > > > > > Finally, do we need to expose any metrics that are related to > maybe the > > > > average number of records in each transaction? Or should we rely > solely > > > > upon existing producer metrics? > > > > > > > > That's a great point--after perusing > > > > https://kafka.apache.org/28/documentation.html#producer_monitoring > it > > > > doesn't look like there are any metrics on producer transaction size. > > > I've > > > > updated the KIP to add source task metrics for minimum, maximum, and > > > > average transaction size. I think that this, in conjunction with > existing > > > > metrics for poll time, commit time, batch size (per call to > > > > "SourceTask::poll"), and active record count, should cover our bases. > > > > > > > > > > > Thanks again! > > > > > > Randall > > > >