Hi Chris,

1. Thanks a lot for elaborating on this, I'm now convinced about the
usefulness of the new offset reset endpoint. Regarding the follow-up KIP
for a fine-grained offset write API, I'd be happy to take that on once this
KIP is finalized and I will definitely look forward to your feedback on
that one!

2. Gotcha, the motivation makes more sense to me now. So the higher level
partition field represents a Connect specific "logical partition" of sorts
- i.e. the source partition as defined by a connector for source connectors
and a Kafka topic + partition for sink connectors. I like the idea of
adding a Kafka prefix to the lower level partition/offset (and topic)
fields which basically makes it more clear (although implicitly) that the
higher level partition/offset field is Connect specific and not the same as
what those terms represent in Kafka itself. However, this then leads me to
wonder if we can make that explicit by including "connect" or "connector"
in the higher level field names? Or do you think this isn't required given
that we're talking about a Connect specific REST API in the first place?

3. Thanks, I think the response structure definitely looks better now!

4. Interesting, I'd be curious to learn why we might want to change this in
the future but that's probably out of scope for this discussion. I'm not
sure I followed why the unresolved writes to the config topic would be an
issue - wouldn't the delete offsets request be added to the herder's
request queue and whenever it is processed, we'd anyway need to check if
all the prerequisites for the request are satisfied?

5. Thanks for elaborating that just fencing out the producer still leaves
many cases where source tasks remain hanging around and also that we anyway
can't have similar data production guarantees for sink connectors right
now. I agree that it might be better to go with ease of implementation and
consistency for now.

6. Right, that does make sense but I still feel like the two states will
end up being confusing to end users who might not be able to discern the
(fairly low-level) differences between them (also the nuances of state
transitions like STOPPED -> PAUSED or PAUSED -> STOPPED with the
rebalancing implications as well). We can probably revisit this potential
deprecation in the future based on user feedback and how the adoption of
the new proposed stop endpoint looks like, what do you think?

7. Aha, that is completely my bad, I missed that the v1/v2 state is only
applicable to the connector's target state and that we don't need to worry
about the tasks since we will have an empty set of tasks. I think I was a
little confused by "pause the parts of the connector that they are
assigned" from the KIP. Thanks for clarifying that!


Some more thoughts and questions that I had -

8. Could you elaborate on what the implementation for offset reset for
source connectors would look like? Currently, it doesn't look like we track
all the partitions for a source connector anywhere. Will we need to
book-keep this somewhere in order to be able to emit a tombstone record for
each source partition?

9. The KIP describes the offset reset endpoint as only being usable on
existing connectors that are in a `STOPPED` state. Why wouldn't we want to
allow resetting offsets for a deleted connector which seems to be a valid
use case? Or do we plan to handle this use case only via the item outlined
in the future work section - "Automatically delete offsets with connectors"?

10. The KIP mentions that source offsets will be reset transactionally for
each topic (worker global offset topic and connector specific offset topic
if it exists). While it obviously isn't possible to atomically do the
writes to two topics which may be on different Kafka clusters, I'm
wondering about what would happen if the first transaction succeeds but the
second one fails. I think the order of the two transactions matters here -
if we successfully emit tombstones to the connector specific offset topic
and fail to do so for the worker global offset topic, we'll presumably fail
the offset delete request because the KIP mentions that "A request to reset
offsets for a source connector will only be considered successful if the
worker is able to delete all known offsets for that connector, on both the
worker's global offsets topic and (if one is used) the connector's
dedicated offsets topic.". However, this will lead to the connector only
being able to read potentially older offsets from the worker global offset
topic on resumption (based on the combined offset view presented as
described in KIP-618 [1]). So, I think we should make sure that the worker
global offset topic tombstoning is attempted first, right? Note that in the
current implementation of `ConnectorOffsetBackingStore::set`, the primary /
connector specific offset store is written to first.

11. This probably isn't necessary to elaborate on in the KIP itself, but I
was wondering what the second offset test - "verify that that those offsets
reflect an expected level of progress for each connector (i.e., they are
greater than or equal to a certain value depending on how the connectors
are configured and how long they have been running)" - would look like?


Thanks,
Yash

[1] -
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration

On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Yash,
>
> Thanks for your detailed thoughts.
>
> 1. In KAFKA-4107 [1], the primary request is exactly what's proposed in the
> KIP right now: a way to reset offsets for connectors. Sure, there's an
> extra step of stopping the connector, but renaming a connector isn't as
> convenient of an alternative as it may seem since in many cases you'd also
> want to delete the older one, so the complete sequence of steps would be
> something like delete the old connector, rename it (possibly requiring
> modifications to its config file, depending on which API is used), then
> create the renamed variant. It's also just not a great user
> experience--even if the practical impacts are limited (which, IMO, they are
> not), people have been asking for years about why they have to employ this
> kind of a workaround for a fairly common use case, and we don't really have
> a good answer beyond "we haven't implemented something better yet". On top
> of that, you may have external tooling that needs to be tweaked to handle a
> new connector name, you may have strict authorization policies around who
> can access what connectors, you may have other ACLs attached to the name of
> the connector (which can be especially common in the case of sink
> connectors, whose consumer group IDs are tied to their names by default),
> and leaving around state in the offsets topic that can never be cleaned up
> presents a bit of a footgun for users. It may not be a silver bullet, but
> providing some mechanism to reset that state is a step in the right
> direction and allows responsible users to more carefully administer their
> cluster without resorting to non-public APIs. That said, I do agree that a
> fine-grained reset/overwrite API would be useful, and I'd be happy to
> review a KIP to add that feature if anyone wants to tackle it!
>
> 2. Keeping the two formats symmetrical is motivated mostly by aesthetics
> and quality-of-life for programmatic interaction with the API; it's not
> really a goal to hide the use of consumer groups from users. I do agree
> that the format is a little strange-looking for sink connectors, but it
> seemed like it would be easier to work with for UIs, casual jq queries, and
> CLIs than a more Kafka-specific alternative such as {"<topic>-<partition>":
> "<offset>"}, and although it is a little strange, I don't think it's any
> less readable or intuitive. That said, I've made some tweaks to the format
> that should make programmatic access even easier; specifically, I've
> removed the "source" and "sink" wrapper fields and instead moved them into
> a top-level object with a "type" and "offsets" field, just like you
> suggested in point 3 (thanks!). We might also consider changing the field
> names for sink offsets from "topic", "partition", and "offset" to "Kafka
> topic", "Kafka partition", and "Kafka offset" respectively, to reduce the
> stuttering effect of having a "partition" field inside a "partition" field
> and the same with an "offset" field; thoughts? One final point--by equating
> source and sink offsets, we probably make it easier for users to understand
> exactly what a source offset is; anyone who's familiar with consumer
> offsets can see from the response format that we identify a logical
> partition as a combination of two entities (a topic and a partition
> number); it should make it easier to grok what a source offset is by seeing
> what the two formats have in common.
>
> 3. Great idea! Done.
>
> 4. Yes, I'm thinking right now that a 409 will be the response status if a
> rebalance is pending. I'd rather not add this to the KIP as we may want to
> change it at some point and it doesn't seem vital to establish it as part
> of the public contract for the new endpoint right now. Also, small
> point--yes, a 409 is useful to avoid forwarding requests to an incorrect
> leader, but it's also useful to ensure that there aren't any unresolved
> writes to the config topic that might cause issues with the request (such
> as deleting the connector).
>
> 5. That's a good point--it may be misleading to call a connector STOPPED
> when it has zombie tasks lying around on the cluster. I don't think it'd be
> appropriate to do this synchronously while handling requests to the PUT
> /connectors/{connector}/stop since we'd want to give all currently-running
> tasks a chance to gracefully shut down, though. I'm also not sure that this
> is a significant problem, either. If the connector is resumed, then all
> zombie tasks will be automatically fenced out by their successors on
> startup; if it's deleted, then we'll have wasted effort by performing an
> unnecessary round of fencing. It may be nice to guarantee that source task
> resources will be deallocated after the connector transitions to STOPPED,
> but realistically, it doesn't do much to just fence out their producers,
> since tasks can be blocked on a number of other operations such as
> key/value/header conversion, transformation, and task polling. It may be a
> little strange if data is produced to Kafka after the connector has
> transitioned to STOPPED, but we can't provide the same guarantees for sink
> connectors, since their tasks may be stuck on a long-running SinkTask::put
> that emits data even after the Connect framework has abandoned them after
> exhausting their graceful shutdown timeout. Ultimately, I'd prefer to err
> on the side of consistency and ease of implementation for now, but I may be
> missing a case where a few extra records from a task that's slow to shut
> down may cause serious issues--let me know.
>
> 6. I'm hesitant to propose deprecation of the PAUSED state right now as it
> does serve a few purposes. Leaving tasks idling-but-ready makes resuming
> them less disruptive across the cluster, since a rebalance isn't necessary.
> It also reduces latency to resume the connector, especially for ones that
> have to do a lot of state gathering on initialization to, e.g., read
> offsets from an external system.
>
> 7. There should be no risk of mixed tasks after a downgrade, thanks to the
> empty set of task configs that gets published to the config topic. Both
> upgraded and downgraded workers will render an empty set of tasks for the
> connector, and keep that set of empty tasks until the connector is resumed.
> Does that address your concerns?
>
> You're also correct that the linked Jira ticket was wrong; thanks for
> pointing that out! Yes, KAFKA-4107 is the intended ticket, and I've updated
> the link in the KIP accordingly.
>
> Cheers,
>
> Chris
>
> [1] - https://issues.apache.org/jira/browse/KAFKA-4107
>
> On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Thanks a lot for this KIP, I think something like this has been long
> > overdue for Kafka Connect :)
> >
> > Some thoughts and questions that I had -
> >
> > 1. I'm wondering if you could elaborate a little more on the use case for
> > the `DELETE /connectors/{connector}/offsets` API. I think we can all
> agree
> > that a fine grained reset API that allows setting arbitrary offsets for
> > partitions would be quite useful (which you talk about in the Future work
> > section). But for the `DELETE /connectors/{connector}/offsets` API in its
> > described form, it looks like it would only serve a seemingly niche use
> > case where users want to avoid renaming connectors - because this new way
> > of resetting offsets actually has more steps (i.e. stop the connector,
> > reset offsets via the API, resume the connector) than simply deleting and
> > re-creating the connector with a different name?
> >
> > 2. The KIP talks about taking care that the response formats (presumably
> > only talking about the new GET API here) are symmetrical for both source
> > and sink connectors - is the end goal to have users of Kafka Connect not
> > even be aware that sink connectors use Kafka consumers under the hood
> (i.e.
> > have that as purely an implementation detail abstracted away from users)?
> > While I understand the value of uniformity here, the response format for
> > sink connectors currently looks a little odd with the "partition" field
> > having "topic" and "partition" as sub-fields, especially to users
> familiar
> > with Kafka semantics. Thoughts?
> >
> > 3. Another little nitpick on the response format - why do we need
> "source"
> > / "sink" as a field under "offsets"? Users can query the connector type
> via
> > the existing `GET /connectors` API. If it's deemed important to let users
> > know that the offsets they're seeing correspond to a source / sink
> > connector, maybe we could have a top level field "type" in the response
> for
> > the `GET /connectors/{connector}/offsets` API similar to the `GET
> > /connectors` API?
> >
> > 4. For the `DELETE /connectors/{connector}/offsets` API, the KIP mentions
> > that requests will be rejected if a rebalance is pending - presumably
> this
> > is to avoid forwarding requests to a leader which may no longer be the
> > leader after the pending rebalance? In this case, the API will return a
> > `409 Conflict` response similar to some of the existing APIs, right?
> >
> > 5. Regarding fencing out previously running tasks for a connector, do you
> > think it would make more sense semantically to have this implemented in
> the
> > stop endpoint where an empty set of tasks is generated, rather than the
> > delete offsets endpoint? This would also give the new `STOPPED` state a
> > higher confidence of sorts, with any zombie tasks being fenced off from
> > continuing to produce data.
> >
> > 6. Thanks for outlining the issues with the current state of the `PAUSED`
> > state - I think a lot of users expect it to behave like the `STOPPED`
> state
> > you outline in the KIP and are (unpleasantly) surprised when it doesn't.
> > However, this does beg the question of what the usefulness of having two
> > separate `PAUSED` and `STOPPED` states is? Do we want to continue
> > supporting both these states in the future, or do you see the `STOPPED`
> > state eventually causing the existing `PAUSED` state to be deprecated?
> >
> > 7. I think the idea outlined in the KIP for handling a new state during
> > cluster downgrades / rolling upgrades is quite clever, but do you think
> > there could be any issues with having a mix of "paused" and "stopped"
> tasks
> > for the same connector across workers in a cluster? At the very least, I
> > think it would be fairly confusing to most users. I'm wondering if this
> can
> > be avoided by stating clearly in the KIP that the new `PUT
> > /connectors/{connector}/stop`
> > can only be used on a cluster that is fully upgraded to an AK version
> newer
> > than the one which ends up containing changes from this KIP and that if a
> > cluster needs to be downgraded to an older version, the user should
> ensure
> > that none of the connectors on the cluster are in a stopped state? With
> the
> > existing implementation, it looks like an unknown/invalid target state
> > record is basically just discarded (with an error message logged), so it
> > doesn't seem to be a disastrous failure scenario that can bring down a
> > worker.
> >
> >
> > Thanks,
> > Yash
> >
> > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Ashwin,
> > >
> > > Thanks for your thoughts. Regarding your questions:
> > >
> > > 1. The response would show the offsets that are visible to the source
> > > connector, so it would combine the contents of the two topics, giving
> > > priority to offsets present in the connector-specific topic. I'm
> > imagining
> > > a follow-up question that some people may have in response to that is
> > > whether we'd want to provide insight into the contents of a single
> topic
> > at
> > > a time. It may be useful to be able to see this information in order to
> > > debug connector issues or verify that it's safe to stop using a
> > > connector-specific offsets topic (either explicitly, or implicitly via
> > > cluster downgrade). What do you think about adding a URL query
> parameter
> > > that allows users to dictate which view of the connector's offsets they
> > are
> > > given in the REST response, with options for the worker's global topic,
> > the
> > > connector-specific topic, and the combined view of them that the
> > connector
> > > and its tasks see (which would be the default)? This may be too much
> for
> > V1
> > > but it feels like it's at least worth exploring a bit.
> > >
> > > 2. There is no option for this at the moment. Reset semantics are
> > extremely
> > > coarse-grained; for source connectors, we delete all source offsets,
> and
> > > for sink connectors, we delete the entire consumer group. I'm hoping
> this
> > > will be enough for V1 and that, if there's sufficient demand for it, we
> > can
> > > introduce a richer API for resetting or even modifying connector
> offsets
> > in
> > > a follow-up KIP.
> > >
> > > 3. Good eye :) I think it's fine to keep the existing behavior for the
> > > PAUSED state with the Connector instance, since the primary purpose of
> > the
> > > Connector is to generate task configs and monitor the external system
> for
> > > changes. If there's no chance for tasks to be running anyways, I don't
> > see
> > > much value in allowing paused connectors to generate new task configs,
> > > especially since each time that happens a rebalance is triggered and
> > > there's a non-zero cost to that. What do you think?
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin <apan...@confluent.io.invalid>
> > > wrote:
> > >
> > > > Thanks for KIP Chris - I think this is a useful feature.
> > > >
> > > > Can you please elaborate on the following in the KIP -
> > > >
> > > > 1. How would the response of GET /connectors/{connector}/offsets look
> > > like
> > > > if the worker has both global and connector specific offsets topic ?
> > > >
> > > > 2. How can we pass the reset options like shift-by , to-date-time
> etc.
> > > > using a REST API like DELETE /connectors/{connector}/offsets ?
> > > >
> > > > 3. Today PAUSE operation on a connector invokes its stop method -
> will
> > > > there be a change here to reduce confusion with the new proposed
> > STOPPED
> > > > state ?
> > > >
> > > > Thanks,
> > > > Ashwin
> > > >
> > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> <chr...@aiven.io.invalid
> > >
> > > > wrote:
> > > >
> > > > > Hi all,
> > > > >
> > > > > I noticed a fairly large gap in the first version of this KIP that
> I
> > > > > published last Friday, which has to do with accommodating
> connectors
> > > > > that target different Kafka clusters than the one that the Kafka
> > > Connect
> > > > > cluster uses for its internal topics and source connectors with
> > > dedicated
> > > > > offsets topics. I've since updated the KIP to address this gap,
> which
> > > has
> > > > > substantially altered the design. Wanted to give a heads-up to
> anyone
> > > > > that's already started reviewing.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <chr...@aiven.io>
> > wrote:
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > I'd like to begin discussion on a KIP to add offsets support to
> the
> > > > Kafka
> > > > > > Connect REST API:
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to