Hey Chris, Thank you for the proposal. Few questions / comments:
0. It may be worth pointing out in the motivation section that source-connector exactly once is more important than sink connector exactly once, since many target systems will have unique key constraint mechanisms that will prevent duplicates. Kafka does not have any such constraints, so without this KIP-618, exactly once won't be possible. 1. I am not clear why we need the worker to async copy offsets from the connector-specific offset topic to a global offsets topic 2. While the reasoning you have for offset topic per connector appears sound, it doesn't add up with the use of transactions in KafkaStreams. My understanding is that KafkaStreams uses shared offsets topic with all the other consumers, and (apparently) corrupting data and delays by other tenants is a non-issue. Perhaps you can comment on how Connect is different? In general much of the complexity in the KIP is related to the separate offset topic, and I'm wondering if this can be avoided. The migration use-case is interesting, but not related to exactly-once and can be handled separately. 3. Allowing users to override the isolation level for the offset reader, even when exactly-once is enabled, thereby disabling exactly-once in a non-obvious way. I get that connect usually allows users to shoot themselves in the foot, but are there any actual benefits for allowing it in this case? Maybe it is better if we don't? I don't find the argument that we always did this to be particularly compelling. 4. It isn't stated explicitly, but it sounds like connect or source connectors already have some batching mechanism, and that transaction boundaries will match the batches (i.e. each batch will be a transaction?). If so, worth being explicit. 5. "When a rebalance is triggered, before (re-)joining the cluster group, all workers will preemptively stop all tasks of all source connectors for which task configurations are present in the config topic after the latest task count record" - how will this play with the incremental rebalances? isn't this exactly the stop-the-world rebalance we want to avoid? 6. "the worker will instantiate a transactional producer whose transactional ID is, by default, the group ID of the cluster (but may be overwritten by users using the transactional.id worker property)" - If users change transactional.id property, zombie leaders won't get fenced (since they will have an older and different transactional id) Thanks, Gwen On Thu, May 21, 2020 at 11:21 PM Chris Egerton <chr...@confluent.io> wrote: > > Hi all, > > I know it's a busy time with the upcoming 2.6 release and I don't expect > this to get a lot of traction until that's done, but I've published a KIP > for allowing atomic commit of offsets and records for source connectors and > would appreciate your feedback: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > This feature should make it possible to implement source connectors with > exactly-once delivery guarantees, and even allow a wide range of existing > source connectors to provide exactly-once delivery guarantees with no > changes required. > > Cheers, > > Chris -- Gwen Shapira Engineering Manager | Confluent 650.450.2760 | @gwenshap Follow us: Twitter | blog