Hi Randall, That's a fair assessment; if a user upgrades their cluster to 3.0 with no changes to worker or connector configs, it's possible that their cluster will break if their worker principal(s) lack the necessary ACLs on the Kafka cluster that hosts the config topic.
If we wanted to take a more conservative approach, we could allow users to opt in to the use of a transactional producer by their cluster's leader through some worker configuration property. The rolling upgrade process from some pre-3.0 cluster to a 3.0+ cluster with exactly-once source support enabled would become: 1. Upgrade cluster to 3.0 (or a later version, if one is available) 2. Enable the use of a transactional producer by the cluster's leader 3. Enable exactly-once source support Since steps 1 and 2 could take place within the same rolling upgrade, the number of rolling upgrades for this new approach would be the same as the current approach: two. The only downside would be additional configuration complexity for the worker, and the upgrade process itself would be a little trickier for users (and potentially more error-prone). In order to reduce the added configuration complexity as much as possible, we could expose this intermediate state (workers are on 3.0 and the leader uses a transactional producer, but exactly-once source support is not enabled) by renaming the "exactly.once.source.enabled" property to "exactly.once.source.support", and permitting values of "disabled" (default), "preparing", and "enabled". The "preparing" and "enabled" values would provide the same behavior as the current proposal with "exactly.once.source.enabled" set to "false" and "true", respectively, and "disabled" would have the same behavior as the current proposal, except without the use of a transactional producer by the leader. I'll update the proposal with this new behavior shortly. Thanks for the review! Cheers, Chris On Wed, Jun 9, 2021 at 1:02 PM Randall Hauch <rha...@gmail.com> wrote: > Chris, > > Sorry for the late question/comment. But the "Breaking Changes" concerns > me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when > they restart their 3.0 worker(s) the workers will fail due to this producer > requirement even if they make no changes to their worker configs or > connector configs. Is this correct? > > If so, I'm concerned about this. Even though the additional producer ACLs > are seemingly minor and easy to change, it is likely that users will not > read the docs before they upgrade, causing their simple upgrade to fail. > And even though in 3.0 we could allow ourselves to cause breaking changes > with a major release, I personally would prefer we not have any such > breaking changes. > > Given that, what would be required for us to eliminate that breaking > change, or change it from a breaking change to a prerequisite for enabling > EOS support in their cluster? > > Thanks, > > Randall > > On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton <chr...@confluent.io.invalid> > wrote: > > > Hi Tom, > > > > I do agree that it'd be safer to default to "required", but since at the > > time of the 3.0 release no existing connectors will have implemented the > > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect > > users to downgrade to "requested" anyways in order to enable exactly-once > > support on their workers. The friction there seems a little excessive; we > > might consider changing the default from "requested" to "required" later > on > > down the line after connector developers have had enough time to put out > > new connector versions that implement the new API. Thoughts? > > > > Cheers, > > > > Chris > > > > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley <tbent...@redhat.com> wrote: > > > > > Hi Chris, > > > > > > Just a minor question: I can see why the default for > exactly.once.support > > > is requested (you want a good first-run experience, I assume), but > it's a > > > little like engineering a safety catch and then not enabling it. > Wouldn't > > > it be safer to default to required, so that there's no way someone can > > > mistakenly not get EoS without explicitly having configured it? > > > > > > Thanks, > > > > > > Tom > > > > > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton > <chr...@confluent.io.invalid > > > > > > wrote: > > > > > > > Hi Gunnar, > > > > > > > > Thanks for taking a look! I've addressed the low-hanging fruit in the > > > KIP; > > > > responses to other comments inline here: > > > > > > > > > * TransactionContext: What's the use case for the methods > accepting a > > > > source record (commitTransaction(SourceRecord > > > > record), abortTransaction(SourceRecord record))? > > > > > > > > This allows developers to decouple transaction boundaries from record > > > > batches. If a connector has a configuration that dictates how often > it > > > > returns from "SourceTask::poll", for example, it may be easier to > > define > > > > multiple transactions within a single batch or a single transaction > > > across > > > > several batches than to retrofit the connector's poll logic to work > > with > > > > transaction boundaries. > > > > > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a > way > > > for > > > > a > > > > connector to query the KC version and thus derive its capabilities? > > Going > > > > forward, a generic API for querying capabilities could be nice, so a > > > > connector can query for capabilities of the runtime in a safe and > > > > compatible way. > > > > > > > > This would be a great quality-of-life improvement for connector and > > > > framework developers alike, but I think it may be best left for a > > > separate > > > > KIP. The current approach, clunky though it may be, seems like a > > nuisance > > > > at worst. It's definitely worth addressing but I'm not sure we have > the > > > > time to think through all the details thoroughly enough in time for > the > > > > upcoming KIP freeze. > > > > > > > > > * SourceConnector: Would it make sense to merge the two methods > > perhaps > > > > and > > > > return one enum of { SUPPORTED, NOT_SUPPORTED, > > SUPPORTED_WITH_BOUNDARIES > > > }? > > > > > > > > Hmm... at first glance I like the idea of merging the two methods a > > lot. > > > > The one thing that gives me pause is that there may be connectors > that > > > > would like to define their own transaction boundaries without > providing > > > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to > > > > accommodate that, but then, it might actually be simpler to keep the > > two > > > > methods separate in case we add some third variable to the mix that > > would > > > > also have to be reflected in the possible ExactlyOnceSupport enum > > values. > > > > > > > > > Or, alternatively return an enum from > > canDefineTransactionBoundaries(), > > > > too; even if it only has two values now, that'd allow for extension > in > > > the > > > > future > > > > > > > > This is fine by me; we just have to figure out exactly which enum > > values > > > > would be suitable. It's a little clunky but right now I'm toying with > > > > something like "ConnectorDefinedTransactionBoundaries" with values of > > > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If > we > > > > need more granularity in the future then we can deprecate one or both > > of > > > > them and add new values. Thoughts? > > > > > > > > > And one general question: in Debezium, we have some connectors that > > > > produce > > > > records "out-of-bands" to a schema history topic via their own custom > > > > producer. Is there any way envisionable where such a producer would > > > > participate in the transaction managed by the KC runtime environment? > > > > > > > > To answer the question exactly as asked: no; transactions cannot be > > > shared > > > > across producers and until/unless that is changed (which seems > > unlikely) > > > > this won't be possible. However, I'm curious why a source connector > > would > > > > spin up its own producer instead of using "SourceTask::poll" to > provide > > > > records to Connect. Is it easier to consume from that topic when the > > > > connector can define its own (de)serialization format? I'm optimistic > > > that > > > > if we understand the use case for the separate producer we may still > be > > > > able to help bridge the gap here, one way or another. > > > > > > > > > One follow-up question after thinking some more about this; is > there > > > any > > > > limit in terms of duration or size of in-flight, connector-controlled > > > > transactions? In case of Debezium for instance, there may be cases > > where > > > we > > > > tail the TX log from an upstream source database, not knowing whether > > the > > > > events we receive belong to a committed or aborted transaction. Would > > it > > > be > > > > valid to emit all these events via a transactional task, and in case > we > > > > receive a ROLLBACK event eventually, to abort the pending Kafka > > > > transaction? Such source transactions could be running for a long > time > > > > potentially, e.g. hours or days (at least in theory). Or would this > > sort > > > of > > > > usage not be considered a reasonable one? > > > > > > > > I think the distinction between reasonable and unreasonable usage > here > > is > > > > likely dependent on use cases that people are trying to satisfy with > > > their > > > > connector, but if I had to guess, I'd say that a different approach > is > > > > probably warranted in most cases if the transaction spans across > entire > > > > days at a time. If there's no concern about data not being visible to > > > > downstream consumers until its transaction is committed, and the > number > > > of > > > > records in the transaction isn't so large that the amount of memory > > > > required to buffer them all locally on a consumer before delivering > > them > > > to > > > > the downstream application is reasonable, it would technically be > > > possible > > > > though. Connect users would have to be mindful of the following: > > > > > > > > - A separate offsets topic for the connector would be highly > > recommended > > > in > > > > order to avoid crippling other connectors with hanging transactions > > > > - The producer-level transaction.timeout.ms property ( > > > > > > > > > > > > > > https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms > > > > ), > > > > which can be configured in connectors either via the worker-level > > > > producer.transaction.timeout.ms or connector-level > > > > producer.override.transaction.timeout.ms property, would have to be > > high > > > > enough to allow for transactions that stay open for long periods of > > time > > > > (the default is 1 minute, so this would almost certainly have to be > > > > adjusted) > > > > - The broker-level transaction.max.timeout.ms property ( > > > > > > > > > > > > > > https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms > > > > ) > > > > would have to be at least as high as the transaction timeout > necessary > > > for > > > > the task (default is 15 minutes, so this would probably need to be > > > > adjusted) > > > > - The broker-level transactional.id.expiration.ms property ( > > > > > > > > > > > > > > https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms > > > > ) > > > > would have to be high enough to not automatically expire the task's > > > > producer if there was a long enough period without new records; > default > > > is > > > > 7 days, so this would probably be fine in most scenarios > > > > > > > > Thanks again for taking a look; insight from connector developers is > > > > tremendously valuable here! > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > On Thu, May 27, 2021 at 6:35 PM Gunnar Morling > > > > <gunnar.morl...@googlemail.com.invalid> wrote: > > > > > > > > > Chris, > > > > > > > > > > One follow-up question after thinking some more about this; is > there > > > any > > > > > limit in terms of duration or size of in-flight, > connector-controlled > > > > > transactions? In case of Debezium for instance, there may be cases > > > where > > > > we > > > > > tail the TX log from an upstream source database, not knowing > whether > > > the > > > > > events we receive belong to a committed or aborted transaction. > Would > > > it > > > > be > > > > > valid to emit all these events via a transactional task, and in > case > > we > > > > > receive a ROLLBACK event eventually, to abort the pending Kafka > > > > > transaction? Such source transactions could be running for a long > > time > > > > > potentially, e.g. hours or days (at least in theory). Or would this > > > sort > > > > of > > > > > usage not be considered a reasonable one? > > > > > > > > > > Thanks, > > > > > > > > > > --Gunnar > > > > > > > > > > > > > > > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling < > > > > > gunnar.morl...@googlemail.com>: > > > > > > > > > > > Chris, all, > > > > > > > > > > > > I've just read KIP-618, and let me congratulate you first of all > > for > > > > this > > > > > > impressive piece of work! Here's a few small suggestions and > > > questions > > > > I > > > > > > had while reading: > > > > > > > > > > > > * TransactionContext: What's the use case for the methods > > accepting a > > > > > > source record (commitTransaction(SourceRecord > > > > > > record), abortTransaction(SourceRecord record))? > > > > > > * SourceTaskContext: Typo in "when the sink connector is > deployed" > > -> > > > > > > source task > > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a > > way > > > > for > > > > > > a connector to query the KC version and thus derive its > > capabilities? > > > > > Going > > > > > > forward, a generic API for querying capabilities could be nice, > so > > a > > > > > > connector can query for capabilities of the runtime in a safe and > > > > > > compatible way. > > > > > > * SourceConnector: exactlyOnceSupport() -> false return value > > doesn't > > > > > match > > > > > > * SourceConnector: Would it make sense to merge the two methods > > > perhaps > > > > > > and return one enum of { SUPPORTED, NOT_SUPPORTED, > > > > > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum > > > > > > from canDefineTransactionBoundaries(), too; even if it only has > two > > > > > values > > > > > > now, that'd allow for extension in the future > > > > > > > > > > > > And one general question: in Debezium, we have some connectors > that > > > > > > produce records "out-of-bands" to a schema history topic via > their > > > own > > > > > > custom producer. Is there any way envisionable where such a > > producer > > > > > would > > > > > > participate in the transaction managed by the KC runtime > > environment? > > > > > > > > > > > > Thanks a lot, > > > > > > > > > > > > --Gunnar > > > > > > > > > > > > > > > > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton > > > > > > <chr...@confluent.io.invalid>: > > > > > > > > > > > >> Hi all, > > > > > >> > > > > > >> Wanted to note here that I've updated the KIP document to > include > > > the > > > > > >> changes discussed recently. They're mostly located in the > "Public > > > > > >> Interfaces" section. I suspect discussion hasn't concluded yet > and > > > > there > > > > > >> will probably be a few more changes to come, but wanted to take > > the > > > > > >> opportunity to provide a snapshot of what the current design > looks > > > > like. > > > > > >> > > > > > >> Cheers, > > > > > >> > > > > > >> Chris > > > > > >> > > > > > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton < > > chr...@confluent.io> > > > > > >> wrote: > > > > > >> > > > > > >> > Hi Tom, > > > > > >> > > > > > > >> > Wow, I was way off base! I was thinking that the intent of the > > > > > fencible > > > > > >> > producer was to employ it by default with 3.0, as opposed to > > only > > > > > after > > > > > >> the > > > > > >> > worker-level > > > > > >> > "exactly.once.source.enabled" property was flipped on. You are > > > > correct > > > > > >> > that with the case you were actually describing, there would > be > > no > > > > > >> > heightened ACL requirements, and that it would leave room in > the > > > > > future > > > > > >> for > > > > > >> > exactly-once to be disabled on a per-connector basis (as long > as > > > all > > > > > the > > > > > >> > workers in the cluster already had > "exactly.once.source.enabled" > > > set > > > > > to > > > > > >> > "true") with no worries about breaking changes. > > > > > >> > > > > > > >> > I agree that this is something for another KIP; even if we > could > > > > > squeeze > > > > > >> > it in in time for this release, it might be a bit much for new > > > users > > > > > to > > > > > >> > take in all at once. But I can add it to the doc as "future > > work" > > > > > since > > > > > >> > it's a promising idea that could prove valuable to someone who > > > might > > > > > >> need > > > > > >> > per-connector granularity in the future. > > > > > >> > > > > > > >> > Thanks for clearing things up; in retrospect your comments > make > > a > > > > lot > > > > > >> more > > > > > >> > sense now, and I hope I've sufficiently addressed them by now. > > > > > >> > > > > > > >> > PSA for you and everyone else--I plan on updating the doc next > > > week > > > > > with > > > > > >> > the new APIs for connector-defined transaction boundaries, > > > > > >> > user-configurable transaction boundaries (i.e., poll vs. > > interval > > > > vs. > > > > > >> > connectors), and preflight checks for exactly-once validation > > > > > (required > > > > > >> vs. > > > > > >> > requested). > > > > > >> > > > > > > >> > Cheers, > > > > > >> > > > > > > >> > Chris > > > > > >> > > > > > > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley < > > tbent...@redhat.com> > > > > > >> wrote: > > > > > >> > > > > > > >> >> Hi Chris, > > > > > >> >> > > > > > >> >> Thanks for continuing to entertain some of these ideas. > > > > > >> >> > > > > > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton > > > > > >> <chr...@confluent.io.invalid > > > > > >> >> > > > > > > >> >> wrote: > > > > > >> >> > > > > > >> >> > [...] > > > > > >> >> > > > > > > >> >> That's true, but we do go from three static ACLs > > (write/describe > > > > on a > > > > > >> >> fixed > > > > > >> >> > transactional ID, and idempotent write on a fixed cluster) > > to a > > > > > >> dynamic > > > > > >> >> > collection of ACLs. > > > > > >> >> > > > > > > >> >> > > > > > >> >> I'm not quite sure I follow, maybe I've lost track. To be > > clear, > > > I > > > > > was > > > > > >> >> suggesting the use of a 'fencing producer' only in clusters > > with > > > > > >> >> exactly.once.source.enabled=true where I imagined the key > > > > difference > > > > > >> >> between the exactly once and fencing cases was how the > producer > > > was > > > > > >> >> configured/used (transactional vs this new fencing > semantic). I > > > > think > > > > > >> the > > > > > >> >> ACL requirements for connector producer principals would > > > therefore > > > > be > > > > > >> the > > > > > >> >> same as currently described in the KIP. The same is true for > > the > > > > > worker > > > > > >> >> principals (which is the only breaking change you give in the > > > KIP). > > > > > So > > > > > >> I > > > > > >> >> don't think the fencing idea changes the backwards > > compatibility > > > > > story > > > > > >> >> that's already in the KIP, just allows a safe per-connector > > > > > >> >> exactly.once=disabled option to be supported (with required > as > > > > > >> requested > > > > > >> >> as > > > > > >> >> we already discussed). > > > > > >> >> > > > > > >> >> But I'm wondering whether I've overlooked something. > > > > > >> >> > > > > > >> >> Ultimately I think it may behoove us to err on the side of > > > reducing > > > > > the > > > > > >> >> > breaking changes here for now and saving them for 4.0 (or > > some > > > > > later > > > > > >> >> major > > > > > >> >> > release), but would be interested in thoughts from you and > > > > others. > > > > > >> >> > > > > > > >> >> > > > > > >> >> Difficult to answer (given I think I might be missing > > something). > > > > > >> >> If there are breaking changes then I don't disagree. It's > > > difficult > > > > > to > > > > > >> >> reason about big changes like this without some practical > > > > experience. > > > > > >> >> If there are not, then I think we could also implement the > > whole > > > > > >> >> exactly.once=disabled thing in a later KIP without additional > > > > > breaking > > > > > >> >> changes (i.e. some time in 3.x), right? > > > > > >> >> > > > > > >> >> > > > > > >> >> > > Gouzhang also has a (possible) use case for a > fencing-only > > > > > >> producer ( > > > > > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as > > he > > > > > points > > > > > >> >> out > > > > > >> >> > there, you should be able to get these semantics today by > > > calling > > > > > >> >> > initTransactions() and then just using the producer as > normal > > > (no > > > > > >> >> > beginTransaction()/abortTransaction()/endTransaction()). > > > > > >> >> > > > > > > >> >> > I tested this locally and was not met with success; > > > transactional > > > > > >> >> producers > > > > > >> >> > do a check right now to ensure that any calls to > > > > > >> "KafkaProducer::send" > > > > > >> >> > occur within a transaction (see > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 > > > > > >> >> > and > > > > > >> >> > > > > > > >> >> > > > > > > >> >> > > > > > >> > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451 > > > > > >> >> > ). > > > > > >> >> > Not a blocker, just noting that we'd have to do some > legwork > > to > > > > > make > > > > > >> >> this > > > > > >> >> > workable with the producer API. > > > > > >> >> > > > > > > >> >> > > > > > >> >> Ah, sorry, I should have actually tried it rather than just > > > taking > > > > a > > > > > >> quick > > > > > >> >> look at the code. > > > > > >> >> > > > > > >> >> Rather than remove those safety checks I suppose we'd need a > > way > > > of > > > > > >> >> distinguishing, in the config, the difference in semantics. > > E.g. > > > > > >> Something > > > > > >> >> like a fencing.id config, which was mutually exclusive with > > > > > >> >> transactional.id. > > > > > >> >> Likewise perhaps initFencing() alongside initTransactions() > in > > > the > > > > > API. > > > > > >> >> But > > > > > >> >> I think at this point it's something for another KIP. > > > > > >> >> > > > > > >> >> Kind regards, > > > > > >> >> > > > > > >> >> Tom > > > > > >> >> > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >