Hi Guozhang,

Your understanding should be correct in most cases, but there are two finer
points that may interest you:

1. It's technically dependent on the implementation of the connector and
how it chooses to allocate source partitions across its tasks; even if the
number of tasks and source partitions remains completely unchanged, a
connector may still choose to shuffle around the partition->task
allocation. I can't think of any cases where this might happen off the top
of my head, but it seemed worth sharing given the educational nature of the
question.
2. It's also possible that the number of source partitions remains
unchanged, but the set of source partitions changes. One case where this
might happen is with a database connector monitoring for tables that match
a given regex every five minutes; if a table that matched that regex during
the last scan got assigned to a task and then dropped, and then another
table that matched the regex got added before the next scan, the connector
would see the same number of tables, but the actual set would be different.
At this point, it would again be connector-dependent for whether the
already-assigned tables stayed assigned to the same tasks. Is anyone else
reminded of the various consumer partition assignment strategies at this
point?

A general comment I should make here (not necessarily for your benefit but
for anyone following along) is that it's important to keep in mind that
"source partitions" in Kafka Connect aren't Kafka topic partitions (well,
unless your connector is designed to replicate data across Kafka clusters
like MirrorMaker 2). As a result, we have to rely on developer-written code
to a greater extent to define what the source partitions for a source
connector are, and how to divvy up that work amongst tasks.

Hope this helps! If you have any further questions please hit me with them;
I doubt you'll be the only one wondering about these things.

Cheers,

Chris

On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <wangg...@gmail.com> wrote:

> Thanks Chris, yeah I think I agree with you that this does not necessarily
> have to be in the scope of this KIP.
>
> My understanding was that the source partitions -> tasks are not static but
> dynamic, but they are only changed when either the number of partitions
> changed or "tasks.max" config changed (please correct me if I'm wrong), so
> what I'm thinking that we can try to detect if either of these things
> happens, and if they do not happen we can assume the mapping from
> partitions -> tasks does not change --- of course this requires some
> extension on the API, aligned with what you said. I would like to make sure
> that my understanding here is correct :)
>
> Guozhang
>
>
> On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io>
> wrote:
>
> > Hi Guozhang,
> >
> > Thanks for taking a look, and for your suggestion!
> >
> > I think there is room for more intelligent fencing strategies, but I
> think
> > that it'd have to be more nuanced than one based on task->worker
> > assignments. Connectors can arbitrarily reassign source partitions (such
> as
> > database tables, or Kafka topic partitions) across tasks, so even if the
> > assignment of tasks across workers remains unchanged, the assignment of
> > source partitions across those workers might. Connect doesn't do any
> > inspection of task configurations at the moment, and without expansions
> to
> > the Connector/Task API, it'd likely be impossible to get information from
> > tasks about their source partition assignments. With that in mind, I
> think
> > we may want to leave the door open for more intelligent task fencing but
> > not include that level of optimization at this stage. Does that sound
> fair
> > to you?
> >
> > There is one case that I've identified where we can cheaply optimize
> right
> > now: single-task connectors, such as the Debezium CDC source connectors.
> If
> > a connector is configured at some point with a single task, then some
> other
> > part of its configuration is altered but the single-task aspect remains,
> > the leader doesn't have to worry about fencing out the older task as the
> > new task's producer will do that automatically. In this case, the leader
> > can skip the producer fencing round and just write the new task count
> > record straight to the config topic. I've added this case to the KIP; if
> it
> > overcomplicates things I'm happy to remove it, but seeing as it serves a
> > real use case and comes fairly cheap, I figured it'd be best to include
> > now.
> >
> > Thanks again for your feedback; if you have other thoughts I'd love to
> hear
> > them!
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io>
> wrote:
> >
> > > Hi Gwen,
> > >
> > > Thanks for the feedback!
> > >
> > > 0.
> > > That's a great point; I've updated the motivation section with that
> > > rationale.
> > >
> > > 1.
> > > This enables safe "hard downgrades" of clusters where, instead of just
> > > disabling exactly-once support on each worker, each worker is rolled
> back
> > > to an earlier version of the Connect framework that doesn't support
> > > per-connector offsets topics altogether. Those workers would go back to
> > all
> > > using a global offsets topic, and any offsets stored in per-connector
> > > topics would be lost to those workers. This would cause a large number
> of
> > > duplicates to flood the downstream system. While technically
> permissible
> > > given that the user in this case will have knowingly switched to a
> > version
> > > of the Connect framework that doesn't support exactly-once source
> > > connectors (and is therefore susceptible to duplicate delivery of
> > records),
> > > the user experience in this case could be pretty bad. A similar
> situation
> > > is if users switch back from per-connector offsets topics to the global
> > > offsets topic.
> > > I've tried to make this more clear in the KIP by linking to the "Hard
> > > downgrade" section from the proposed design, and by expanding on the
> > > rationale provided for redundant global offset writes in the "Hard
> > > downgrade" section. Let me know if you think this could be improved or
> > > think a different approach is warranted.
> > >
> > > 2.
> > > I think the biggest difference between Connect and Streams comes from
> the
> > > fact that Connect allows users to create connectors that target
> different
> > > Kafka clusters on the same worker. This hasn't been a problem in the
> past
> > > because workers use two separate producers to write offset data and
> > source
> > > connector records, but in order to write offsets and records in the
> same
> > > transaction, it becomes necessary to use a single producer, which also
> > > requires that the internal offsets topic be hosted on the same Kafka
> > > cluster that the connector is targeting.
> > > This may sound like a niche use case but it was actually one of the
> > > driving factors behind KIP-458 (
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> > ),
> > > and it's a feature that we rely on heavily today.
> > > If there's too much going on in this KIP and we'd prefer to drop
> support
> > > for running that type of setup with exactly-once source connectors for
> > now,
> > > I can propose this in a separate KIP. I figured it'd be best to get
> this
> > > out of the way with the initial introduction of exactly-once source
> > support
> > > in order to make adoption by existing Connect users as seamless as
> > > possible, and since we'd likely have to address this issue before being
> > > able to utilize the feature ourselves.
> > > I switched around the ordering of the "motivation" section for
> > > per-connector offsets topics to put the biggest factor first, and
> called
> > it
> > > out as the major difference between Connect and Streams in this case.
> > >
> > > 3.
> > > Fair enough, after giving it a little more thought I agree that
> allowing
> > > users to shoot themselves in the foot is a bad idea here. There's also
> > some
> > > precedent for handling this with the "enable.idempotence" and "
> > > transactional.id" producer properties; if you specify a transactional
> ID
> > > but don't specify a value for idempotence, the producer just does the
> > right
> > > thing for you by enabling idempotence and emitting a log message
> letting
> > > you know that it's done so. I've adjusted the proposed behavior to try
> to
> > > use a similar approach; let me know what you think.
> > > There is the potential gap here where, sometime in the future, a third
> > > accepted value for the "isolation.level" property is added to the
> > consumer
> > > API and users will be unable to use that new value for their worker.
> But
> > > the likelihood of footgunning seems much greater than this scenario,
> and
> > we
> > > can always address expansions to the consumer API with changes to the
> > > Connect framework as well if/when that becomes necessary.
> > > I've also added a similar note to the source task's transactional ID
> > > property; user overrides of it will also be disabled.
> > >
> > > 4.
> > > Yeah, that's mostly correct. I tried to touch on this in the
> "Motivation"
> > > section with this bit:
> > >     > The Connect framework periodically writes source task offsets to
> an
> > > internal Kafka topic at a configurable interval, once the source
> records
> > > that they correspond to have been successfully sent to Kafka.
> > > I've expanded on this in the "Offset (and record) writes" section, and
> > > I've tweaked the "Motivation" section a little bit to add a link to the
> > > relevant config property and to make the language a little more
> accurate.
> > >
> > > 5.
> > > This isn't quite as bad as stop-the-world; more like
> stop-the-connector.
> > > If a worker is running a dozen connectors and one of those (that
> happens
> > to
> > > be a source) is reconfigured, only the tasks for that connector will be
> > > preemptively halted, and all other tasks and connectors will continue
> > > running. This is pretty close to the current behavior with incremental
> > > rebalancing; the only difference is that, instead of waiting for the
> > > rebalance to complete before halting the tasks for that connector, the
> > > worker will halt them in preparation for the rebalance. This increases
> > the
> > > time that the tasks will be down for, but is necessary if we want to
> give
> > > tasks time to gracefully shut down before being fenced out by the
> leader,
> > > and shouldn't impact availability too much as it's only triggered on
> > > connector reconfiguration and rebalances generally don't take that long
> > > with the new incremental protocol.
> > >
> > > 6.
> > > Ah, thanks for the catch! That's a good point. I've adjusted the
> proposal
> > > to treat the worker's transactional ID like the consumer's isolation
> > level
> > > and the source task's transactional ID properties; can't be modified by
> > > users and, if an attempt is made to do so, will be discarded after
> > logging
> > > a message to that effect. More footguns removed, hooray!
> > >
> > > Thanks again for taking a look; if you have any other questions or
> > > suggestions I'm all ears.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > p.s. - Guozhang--I saw your email; response to you coming next :)
> > >
> > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > >
> > >> Hello Chris,
> > >>
> > >> Thanks for the great write-up! I was mainly reviewing the admin
> > >> fenceProducers API of the KIP. I think it makes sense overall. I'm
> just
> > >> wondering if we can go one step further, that instead of forcing to
> > fence
> > >> out all producers of the previous generation, could we try to achieve
> > >> "partial rebalance" still by first generate the new assignment, and
> then
> > >> based on the new assignment only fence out producers involved in tasks
> > >> that
> > >> are indeed migrated? Just a wild thought to bring up for debate.
> > >>
> > >> Guozhang
> > >>
> > >>
> > >>
> > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io>
> > wrote:
> > >>
> > >> > Hey Chris,
> > >> >
> > >> > Thank you for the proposal. Few questions / comments:
> > >> >
> > >> > 0. It may be worth pointing out in the motivation section that
> > >> > source-connector exactly once is more important than sink connector
> > >> > exactly once, since many target systems will have unique key
> > >> > constraint mechanisms that will prevent duplicates. Kafka does not
> > >> > have any such constraints, so without this KIP-618, exactly once
> won't
> > >> > be possible.
> > >> > 1. I am not clear why we need the worker to async copy offsets from
> > >> > the connector-specific offset topic to a global offsets topic
> > >> > 2. While the reasoning you have for offset topic per connector
> appears
> > >> > sound, it doesn't add up with the use of transactions in
> KafkaStreams.
> > >> > My understanding is that KafkaStreams uses shared offsets topic with
> > >> > all the other consumers, and (apparently) corrupting data and delays
> > >> > by other tenants is a non-issue. Perhaps you can comment on how
> > >> > Connect is different? In general much of the complexity in the KIP
> is
> > >> > related to the separate offset topic, and I'm wondering if this can
> be
> > >> > avoided. The migration use-case is interesting, but not related to
> > >> > exactly-once and can be handled separately.
> > >> > 3. Allowing users to override the isolation level for the offset
> > >> > reader, even when exactly-once is enabled, thereby disabling
> > >> > exactly-once in a non-obvious way. I get that connect usually allows
> > >> > users to shoot themselves in the foot, but are there any actual
> > >> > benefits for allowing it in this case? Maybe it is better if we
> don't?
> > >> > I don't find the argument that we always did this to be particularly
> > >> > compelling.
> > >> > 4. It isn't stated explicitly, but it sounds like connect or source
> > >> > connectors already have some batching mechanism, and that
> transaction
> > >> > boundaries will match the batches (i.e. each batch will be a
> > >> > transaction?). If so, worth being explicit.
> > >> > 5. "When a rebalance is triggered, before (re-)joining the cluster
> > >> > group, all workers will preemptively stop all tasks of all source
> > >> > connectors for which task configurations are present in the config
> > >> > topic after the latest task count record" - how will this play with
> > >> > the incremental rebalances? isn't this exactly the stop-the-world
> > >> > rebalance we want to avoid?
> > >> > 6. "the worker will instantiate a transactional producer whose
> > >> > transactional ID is, by default, the group ID of the cluster (but
> may
> > >> > be overwritten by users using the transactional.id worker
> property)"
> > -
> > >> > If users change transactional.id property, zombie leaders won't get
> > >> > fenced (since they will have an older and different transactional
> id)
> > >> >
> > >> > Thanks,
> > >> >
> > >> > Gwen
> > >> >
> > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <chr...@confluent.io
> >
> > >> > wrote:
> > >> > >
> > >> > > Hi all,
> > >> > >
> > >> > > I know it's a busy time with the upcoming 2.6 release and I don't
> > >> expect
> > >> > > this to get a lot of traction until that's done, but I've
> published
> > a
> > >> KIP
> > >> > > for allowing atomic commit of offsets and records for source
> > >> connectors
> > >> > and
> > >> > > would appreciate your feedback:
> > >> > >
> > >> >
> > >>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > >> > >
> > >> > > This feature should make it possible to implement source
> connectors
> > >> with
> > >> > > exactly-once delivery guarantees, and even allow a wide range of
> > >> existing
> > >> > > source connectors to provide exactly-once delivery guarantees with
> > no
> > >> > > changes required.
> > >> > >
> > >> > > Cheers,
> > >> > >
> > >> > > Chris
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Gwen Shapira
> > >> > Engineering Manager | Confluent
> > >> > 650.450.2760 | @gwenshap
> > >> > Follow us: Twitter | blog
> > >> >
> > >>
> > >>
> > >> --
> > >> -- Guozhang
> > >>
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to