Chris, Sorry for the late question/comment. But the "Breaking Changes" concerns me. IIUC, when a user upgrades their 1.x or 2.x Connect cluster, then when they restart their 3.0 worker(s) the workers will fail due to this producer requirement even if they make no changes to their worker configs or connector configs. Is this correct?
If so, I'm concerned about this. Even though the additional producer ACLs are seemingly minor and easy to change, it is likely that users will not read the docs before they upgrade, causing their simple upgrade to fail. And even though in 3.0 we could allow ourselves to cause breaking changes with a major release, I personally would prefer we not have any such breaking changes. Given that, what would be required for us to eliminate that breaking change, or change it from a breaking change to a prerequisite for enabling EOS support in their cluster? Thanks, Randall On Wed, Jun 2, 2021 at 8:42 AM Chris Egerton <chr...@confluent.io.invalid> wrote: > Hi Tom, > > I do agree that it'd be safer to default to "required", but since at the > time of the 3.0 release no existing connectors will have implemented the > "SourceConnector::exactlyOnceSupport" method, it'd require all Connect > users to downgrade to "requested" anyways in order to enable exactly-once > support on their workers. The friction there seems a little excessive; we > might consider changing the default from "requested" to "required" later on > down the line after connector developers have had enough time to put out > new connector versions that implement the new API. Thoughts? > > Cheers, > > Chris > > On Wed, Jun 2, 2021 at 8:49 AM Tom Bentley <tbent...@redhat.com> wrote: > > > Hi Chris, > > > > Just a minor question: I can see why the default for exactly.once.support > > is requested (you want a good first-run experience, I assume), but it's a > > little like engineering a safety catch and then not enabling it. Wouldn't > > it be safer to default to required, so that there's no way someone can > > mistakenly not get EoS without explicitly having configured it? > > > > Thanks, > > > > Tom > > > > On Tue, Jun 1, 2021 at 4:48 PM Chris Egerton <chr...@confluent.io.invalid > > > > wrote: > > > > > Hi Gunnar, > > > > > > Thanks for taking a look! I've addressed the low-hanging fruit in the > > KIP; > > > responses to other comments inline here: > > > > > > > * TransactionContext: What's the use case for the methods accepting a > > > source record (commitTransaction(SourceRecord > > > record), abortTransaction(SourceRecord record))? > > > > > > This allows developers to decouple transaction boundaries from record > > > batches. If a connector has a configuration that dictates how often it > > > returns from "SourceTask::poll", for example, it may be easier to > define > > > multiple transactions within a single batch or a single transaction > > across > > > several batches than to retrofit the connector's poll logic to work > with > > > transaction boundaries. > > > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a way > > for > > > a > > > connector to query the KC version and thus derive its capabilities? > Going > > > forward, a generic API for querying capabilities could be nice, so a > > > connector can query for capabilities of the runtime in a safe and > > > compatible way. > > > > > > This would be a great quality-of-life improvement for connector and > > > framework developers alike, but I think it may be best left for a > > separate > > > KIP. The current approach, clunky though it may be, seems like a > nuisance > > > at worst. It's definitely worth addressing but I'm not sure we have the > > > time to think through all the details thoroughly enough in time for the > > > upcoming KIP freeze. > > > > > > > * SourceConnector: Would it make sense to merge the two methods > perhaps > > > and > > > return one enum of { SUPPORTED, NOT_SUPPORTED, > SUPPORTED_WITH_BOUNDARIES > > }? > > > > > > Hmm... at first glance I like the idea of merging the two methods a > lot. > > > The one thing that gives me pause is that there may be connectors that > > > would like to define their own transaction boundaries without providing > > > exactly-once guarantees. We could add UNSUPPORTED_WITH_BOUNDARIES to > > > accommodate that, but then, it might actually be simpler to keep the > two > > > methods separate in case we add some third variable to the mix that > would > > > also have to be reflected in the possible ExactlyOnceSupport enum > values. > > > > > > > Or, alternatively return an enum from > canDefineTransactionBoundaries(), > > > too; even if it only has two values now, that'd allow for extension in > > the > > > future > > > > > > This is fine by me; we just have to figure out exactly which enum > values > > > would be suitable. It's a little clunky but right now I'm toying with > > > something like "ConnectorDefinedTransactionBoundaries" with values of > > > "SUPPORTED" and "NOT_SUPPORTED" and a default of "NOT_SUPPORTED". If we > > > need more granularity in the future then we can deprecate one or both > of > > > them and add new values. Thoughts? > > > > > > > And one general question: in Debezium, we have some connectors that > > > produce > > > records "out-of-bands" to a schema history topic via their own custom > > > producer. Is there any way envisionable where such a producer would > > > participate in the transaction managed by the KC runtime environment? > > > > > > To answer the question exactly as asked: no; transactions cannot be > > shared > > > across producers and until/unless that is changed (which seems > unlikely) > > > this won't be possible. However, I'm curious why a source connector > would > > > spin up its own producer instead of using "SourceTask::poll" to provide > > > records to Connect. Is it easier to consume from that topic when the > > > connector can define its own (de)serialization format? I'm optimistic > > that > > > if we understand the use case for the separate producer we may still be > > > able to help bridge the gap here, one way or another. > > > > > > > One follow-up question after thinking some more about this; is there > > any > > > limit in terms of duration or size of in-flight, connector-controlled > > > transactions? In case of Debezium for instance, there may be cases > where > > we > > > tail the TX log from an upstream source database, not knowing whether > the > > > events we receive belong to a committed or aborted transaction. Would > it > > be > > > valid to emit all these events via a transactional task, and in case we > > > receive a ROLLBACK event eventually, to abort the pending Kafka > > > transaction? Such source transactions could be running for a long time > > > potentially, e.g. hours or days (at least in theory). Or would this > sort > > of > > > usage not be considered a reasonable one? > > > > > > I think the distinction between reasonable and unreasonable usage here > is > > > likely dependent on use cases that people are trying to satisfy with > > their > > > connector, but if I had to guess, I'd say that a different approach is > > > probably warranted in most cases if the transaction spans across entire > > > days at a time. If there's no concern about data not being visible to > > > downstream consumers until its transaction is committed, and the number > > of > > > records in the transaction isn't so large that the amount of memory > > > required to buffer them all locally on a consumer before delivering > them > > to > > > the downstream application is reasonable, it would technically be > > possible > > > though. Connect users would have to be mindful of the following: > > > > > > - A separate offsets topic for the connector would be highly > recommended > > in > > > order to avoid crippling other connectors with hanging transactions > > > - The producer-level transaction.timeout.ms property ( > > > > > > > > > https://kafka.apache.org/28/documentation.html#producerconfigs_transaction.timeout.ms > > > ), > > > which can be configured in connectors either via the worker-level > > > producer.transaction.timeout.ms or connector-level > > > producer.override.transaction.timeout.ms property, would have to be > high > > > enough to allow for transactions that stay open for long periods of > time > > > (the default is 1 minute, so this would almost certainly have to be > > > adjusted) > > > - The broker-level transaction.max.timeout.ms property ( > > > > > > > > > https://kafka.apache.org/28/documentation.html#brokerconfigs_transaction.max.timeout.ms > > > ) > > > would have to be at least as high as the transaction timeout necessary > > for > > > the task (default is 15 minutes, so this would probably need to be > > > adjusted) > > > - The broker-level transactional.id.expiration.ms property ( > > > > > > > > > https://kafka.apache.org/28/documentation.html#brokerconfigs_transactional.id.expiration.ms > > > ) > > > would have to be high enough to not automatically expire the task's > > > producer if there was a long enough period without new records; default > > is > > > 7 days, so this would probably be fine in most scenarios > > > > > > Thanks again for taking a look; insight from connector developers is > > > tremendously valuable here! > > > > > > Cheers, > > > > > > Chris > > > > > > On Thu, May 27, 2021 at 6:35 PM Gunnar Morling > > > <gunnar.morl...@googlemail.com.invalid> wrote: > > > > > > > Chris, > > > > > > > > One follow-up question after thinking some more about this; is there > > any > > > > limit in terms of duration or size of in-flight, connector-controlled > > > > transactions? In case of Debezium for instance, there may be cases > > where > > > we > > > > tail the TX log from an upstream source database, not knowing whether > > the > > > > events we receive belong to a committed or aborted transaction. Would > > it > > > be > > > > valid to emit all these events via a transactional task, and in case > we > > > > receive a ROLLBACK event eventually, to abort the pending Kafka > > > > transaction? Such source transactions could be running for a long > time > > > > potentially, e.g. hours or days (at least in theory). Or would this > > sort > > > of > > > > usage not be considered a reasonable one? > > > > > > > > Thanks, > > > > > > > > --Gunnar > > > > > > > > > > > > Am Do., 27. Mai 2021 um 23:15 Uhr schrieb Gunnar Morling < > > > > gunnar.morl...@googlemail.com>: > > > > > > > > > Chris, all, > > > > > > > > > > I've just read KIP-618, and let me congratulate you first of all > for > > > this > > > > > impressive piece of work! Here's a few small suggestions and > > questions > > > I > > > > > had while reading: > > > > > > > > > > * TransactionContext: What's the use case for the methods > accepting a > > > > > source record (commitTransaction(SourceRecord > > > > > record), abortTransaction(SourceRecord record))? > > > > > * SourceTaskContext: Typo in "when the sink connector is deployed" > -> > > > > > source task > > > > > * SourceTaskContext: Instead of guarding against NSME, is there a > way > > > for > > > > > a connector to query the KC version and thus derive its > capabilities? > > > > Going > > > > > forward, a generic API for querying capabilities could be nice, so > a > > > > > connector can query for capabilities of the runtime in a safe and > > > > > compatible way. > > > > > * SourceConnector: exactlyOnceSupport() -> false return value > doesn't > > > > match > > > > > * SourceConnector: Would it make sense to merge the two methods > > perhaps > > > > > and return one enum of { SUPPORTED, NOT_SUPPORTED, > > > > > SUPPORTED_WITH_BOUNDARIES }? Or, alternatively return an enum > > > > > from canDefineTransactionBoundaries(), too; even if it only has two > > > > values > > > > > now, that'd allow for extension in the future > > > > > > > > > > And one general question: in Debezium, we have some connectors that > > > > > produce records "out-of-bands" to a schema history topic via their > > own > > > > > custom producer. Is there any way envisionable where such a > producer > > > > would > > > > > participate in the transaction managed by the KC runtime > environment? > > > > > > > > > > Thanks a lot, > > > > > > > > > > --Gunnar > > > > > > > > > > > > > > > Am Mo., 24. Mai 2021 um 18:45 Uhr schrieb Chris Egerton > > > > > <chr...@confluent.io.invalid>: > > > > > > > > > >> Hi all, > > > > >> > > > > >> Wanted to note here that I've updated the KIP document to include > > the > > > > >> changes discussed recently. They're mostly located in the "Public > > > > >> Interfaces" section. I suspect discussion hasn't concluded yet and > > > there > > > > >> will probably be a few more changes to come, but wanted to take > the > > > > >> opportunity to provide a snapshot of what the current design looks > > > like. > > > > >> > > > > >> Cheers, > > > > >> > > > > >> Chris > > > > >> > > > > >> On Fri, May 21, 2021 at 4:32 PM Chris Egerton < > chr...@confluent.io> > > > > >> wrote: > > > > >> > > > > >> > Hi Tom, > > > > >> > > > > > >> > Wow, I was way off base! I was thinking that the intent of the > > > > fencible > > > > >> > producer was to employ it by default with 3.0, as opposed to > only > > > > after > > > > >> the > > > > >> > worker-level > > > > >> > "exactly.once.source.enabled" property was flipped on. You are > > > correct > > > > >> > that with the case you were actually describing, there would be > no > > > > >> > heightened ACL requirements, and that it would leave room in the > > > > future > > > > >> for > > > > >> > exactly-once to be disabled on a per-connector basis (as long as > > all > > > > the > > > > >> > workers in the cluster already had "exactly.once.source.enabled" > > set > > > > to > > > > >> > "true") with no worries about breaking changes. > > > > >> > > > > > >> > I agree that this is something for another KIP; even if we could > > > > squeeze > > > > >> > it in in time for this release, it might be a bit much for new > > users > > > > to > > > > >> > take in all at once. But I can add it to the doc as "future > work" > > > > since > > > > >> > it's a promising idea that could prove valuable to someone who > > might > > > > >> need > > > > >> > per-connector granularity in the future. > > > > >> > > > > > >> > Thanks for clearing things up; in retrospect your comments make > a > > > lot > > > > >> more > > > > >> > sense now, and I hope I've sufficiently addressed them by now. > > > > >> > > > > > >> > PSA for you and everyone else--I plan on updating the doc next > > week > > > > with > > > > >> > the new APIs for connector-defined transaction boundaries, > > > > >> > user-configurable transaction boundaries (i.e., poll vs. > interval > > > vs. > > > > >> > connectors), and preflight checks for exactly-once validation > > > > (required > > > > >> vs. > > > > >> > requested). > > > > >> > > > > > >> > Cheers, > > > > >> > > > > > >> > Chris > > > > >> > > > > > >> > On Fri, May 21, 2021 at 7:14 AM Tom Bentley < > tbent...@redhat.com> > > > > >> wrote: > > > > >> > > > > > >> >> Hi Chris, > > > > >> >> > > > > >> >> Thanks for continuing to entertain some of these ideas. > > > > >> >> > > > > >> >> On Fri, May 14, 2021 at 5:06 PM Chris Egerton > > > > >> <chr...@confluent.io.invalid > > > > >> >> > > > > > >> >> wrote: > > > > >> >> > > > > >> >> > [...] > > > > >> >> > > > > > >> >> That's true, but we do go from three static ACLs > (write/describe > > > on a > > > > >> >> fixed > > > > >> >> > transactional ID, and idempotent write on a fixed cluster) > to a > > > > >> dynamic > > > > >> >> > collection of ACLs. > > > > >> >> > > > > > >> >> > > > > >> >> I'm not quite sure I follow, maybe I've lost track. To be > clear, > > I > > > > was > > > > >> >> suggesting the use of a 'fencing producer' only in clusters > with > > > > >> >> exactly.once.source.enabled=true where I imagined the key > > > difference > > > > >> >> between the exactly once and fencing cases was how the producer > > was > > > > >> >> configured/used (transactional vs this new fencing semantic). I > > > think > > > > >> the > > > > >> >> ACL requirements for connector producer principals would > > therefore > > > be > > > > >> the > > > > >> >> same as currently described in the KIP. The same is true for > the > > > > worker > > > > >> >> principals (which is the only breaking change you give in the > > KIP). > > > > So > > > > >> I > > > > >> >> don't think the fencing idea changes the backwards > compatibility > > > > story > > > > >> >> that's already in the KIP, just allows a safe per-connector > > > > >> >> exactly.once=disabled option to be supported (with required as > > > > >> requested > > > > >> >> as > > > > >> >> we already discussed). > > > > >> >> > > > > >> >> But I'm wondering whether I've overlooked something. > > > > >> >> > > > > >> >> Ultimately I think it may behoove us to err on the side of > > reducing > > > > the > > > > >> >> > breaking changes here for now and saving them for 4.0 (or > some > > > > later > > > > >> >> major > > > > >> >> > release), but would be interested in thoughts from you and > > > others. > > > > >> >> > > > > > >> >> > > > > >> >> Difficult to answer (given I think I might be missing > something). > > > > >> >> If there are breaking changes then I don't disagree. It's > > difficult > > > > to > > > > >> >> reason about big changes like this without some practical > > > experience. > > > > >> >> If there are not, then I think we could also implement the > whole > > > > >> >> exactly.once=disabled thing in a later KIP without additional > > > > breaking > > > > >> >> changes (i.e. some time in 3.x), right? > > > > >> >> > > > > >> >> > > > > >> >> > > Gouzhang also has a (possible) use case for a fencing-only > > > > >> producer ( > > > > >> >> > https://issues.apache.org/jira/browse/KAFKA-12693), and as > he > > > > points > > > > >> >> out > > > > >> >> > there, you should be able to get these semantics today by > > calling > > > > >> >> > initTransactions() and then just using the producer as normal > > (no > > > > >> >> > beginTransaction()/abortTransaction()/endTransaction()). > > > > >> >> > > > > > >> >> > I tested this locally and was not met with success; > > transactional > > > > >> >> producers > > > > >> >> > do a check right now to ensure that any calls to > > > > >> "KafkaProducer::send" > > > > >> >> > occur within a transaction (see > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > >> > > > > > > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L957-L959 > > > > >> >> > and > > > > >> >> > > > > > >> >> > > > > > >> >> > > > > >> > > > > > > > > > > https://github.com/apache/kafka/blob/29c55fdbbc331bbede17908ccc878953a1b15d87/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L450-L451 > > > > >> >> > ). > > > > >> >> > Not a blocker, just noting that we'd have to do some legwork > to > > > > make > > > > >> >> this > > > > >> >> > workable with the producer API. > > > > >> >> > > > > > >> >> > > > > >> >> Ah, sorry, I should have actually tried it rather than just > > taking > > > a > > > > >> quick > > > > >> >> look at the code. > > > > >> >> > > > > >> >> Rather than remove those safety checks I suppose we'd need a > way > > of > > > > >> >> distinguishing, in the config, the difference in semantics. > E.g. > > > > >> Something > > > > >> >> like a fencing.id config, which was mutually exclusive with > > > > >> >> transactional.id. > > > > >> >> Likewise perhaps initFencing() alongside initTransactions() in > > the > > > > API. > > > > >> >> But > > > > >> >> I think at this point it's something for another KIP. > > > > >> >> > > > > >> >> Kind regards, > > > > >> >> > > > > >> >> Tom > > > > >> >> > > > > >> > > > > > >> > > > > > > > > > > > > > > >