Hi Chris, Thanks for all the updates, yes that seems good!
Mickael On Thu, Nov 17, 2022 at 8:41 PM Chris Egerton <chr...@aiven.io.invalid> wrote: > > Hi Mickael, > > Thanks for your thoughts! IMO it's most intuitive to use a null value in > the PATCH API to signify that an offset should be reset, since it aligns > nicely with the API we provide to source connectors, where null offsets are > translated under the hood to tombstone records in the internal offsets > topic. Does that seem reasonable to you? > > Cheers, > > Chris > > On Thu, Nov 17, 2022 at 2:35 PM Chris Egerton <chr...@aiven.io> wrote: > > > Hi Yash, > > > > I've updated the KIP with the correct "kafka_topic", "kafka_partition", > > and "kafka_offset" keys in the JSON examples (settled on those instead of > > prefixing with "Kafka " for better interactions with tooling like JQ). I've > > also added a note about sink offset requests failing if there are still > > active members in the consumer group. > > > > I don't believe logging an error message is sufficient for handling > > failures to reset-after-delete. IMO it's highly likely that users will > > either shoot themselves in the foot by not reading the fine print and > > realizing that the offset request may have failed, or will ask for better > > visibility into the success or failure of the reset request than scanning > > log files. I don't doubt that there are ways to address this, but I would > > prefer to leave them to a separate KIP since the required design work is > > non-trivial and I do not feel that the added burden is worth tying to this > > KIP as a blocker. > > > > I was really hoping to avoid introducing a change to the developer-facing > > APIs with this KIP, but after giving it some thought I think this may be > > unavoidable. It's debatable whether validation of altered offsets is a good > > enough use case on its own for this kind of API, but since there are also > > connectors out there that manage offsets externally, we should probably add > > a hook to allow those external offsets to be managed, which can then serve > > double- or even-triple duty as a hook to validate custom offsets and to > > notify users whether offset resets/alterations are supported at all (which > > they may not be if, for example, offsets are coupled tightly with the data > > written by a sink connector). I've updated the KIP with the > > developer-facing API changes for this logic; let me know what you think. > > > > Cheers, > > > > Chris > > > > On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison <mickael.mai...@gmail.com> > > wrote: > > > >> Hi Chris, > >> > >> Thanks for the update! > >> > >> It's relatively common to only want to reset offsets for a specific > >> resource (for example with MirrorMaker for one or a group of topics). > >> Could it be possible to add a way to do so? Either by providing a > >> payload to DELETE or by setting the offset field to an empty object in > >> the PATCH payload? > >> > >> Thanks, > >> Mickael > >> > >> On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya <yash.ma...@gmail.com> wrote: > >> > > >> > Hi Chris, > >> > > >> > Thanks for pointing out that the consumer group deletion step itself > >> will > >> > fail in case of zombie sink tasks. Since we can't get any stronger > >> > guarantees from consumers (unlike with transactional producers), I > >> think it > >> > makes perfect sense to fail the offset reset attempt in such scenarios > >> with > >> > a relevant error message to the user. I was more concerned about > >> silently > >> > failing but it looks like that won't be an issue. It's probably worth > >> > calling out this difference between source / sink connectors explicitly > >> in > >> > the KIP, what do you think? > >> > > >> > > changing the field names for sink offsets > >> > > from "topic", "partition", and "offset" to "Kafka > >> > > topic", "Kafka partition", and "Kafka offset" respectively, to > >> > > reduce the stuttering effect of having a "partition" field inside > >> > > a "partition" field and the same with an "offset" field > >> > > >> > The KIP is still using the nested partition / offset fields by the way - > >> > has it not been updated because we're waiting for consensus on the field > >> > names? > >> > > >> > > The reset-after-delete feature, on the other > >> > > hand, is actually pretty tricky to design; I've updated the > >> > > rationale in the KIP for delaying it and clarified that it's not > >> > > just a matter of implementation but also design work. > >> > > >> > I like the idea of writing an offset reset request to the config topic > >> > which will be processed by the herder's config update listener - I'm not > >> > sure I fully follow the concerns with regard to handling failures? Why > >> > can't we simply log an error saying that the offset reset for the > >> deleted > >> > connector "xyz" failed due to reason "abc"? As long as it's documented > >> that > >> > connector deletion and offset resets are asynchronous and a success > >> > response only means that the request was initiated successfully (which > >> is > >> > the case even today with normal connector deletion), we should be fine > >> > right? > >> > > >> > Thanks for adding the new PATCH endpoint to the KIP, I think it's a lot > >> > more useful for this use case than a PUT endpoint would be! One thing > >> > that I was thinking about with the new PATCH endpoint is that while we > >> can > >> > easily validate the request body format for sink connectors (since it's > >> the > >> > same across all connectors), we can't do the same for source connectors > >> as > >> > things stand today since each source connector implementation can define > >> > its own source partition and offset structures. Without any validation, > >> > writing a bad offset for a source connector via the PATCH endpoint could > >> > cause it to fail with hard to discern errors. I'm wondering if we could > >> add > >> > a new method to the `SourceConnector` class (which should be overridden > >> by > >> > source connector implementations) that would validate whether or not the > >> > provided source partitions and source offsets are valid for the > >> connector > >> > (it could have a default implementation returning true unconditionally > >> for > >> > backward compatibility). > >> > > >> > > I've also added an implementation plan to the KIP, which calls > >> > > out the different parts that can be worked on independently so that > >> > > others (hi Yash 🙂) can also tackle parts of this if they'd like. > >> > > >> > I'd be more than happy to pick up one or more of the implementation > >> parts, > >> > thanks for breaking it up into granular pieces! > >> > > >> > Thanks, > >> > Yash > >> > > >> > On Fri, Nov 11, 2022 at 11:25 PM Chris Egerton <chr...@aiven.io.invalid > >> > > >> > wrote: > >> > > >> > > Hi Mickael, > >> > > > >> > > Thanks for your feedback. This has been on my TODO list as well :) > >> > > > >> > > 1. That's fair! Support for altering offsets is easy enough to > >> design, so > >> > > I've added it to the KIP. The reset-after-delete feature, on the other > >> > > hand, is actually pretty tricky to design; I've updated the rationale > >> in > >> > > the KIP for delaying it and clarified that it's not just a matter of > >> > > implementation but also design work. If you or anyone else can think > >> of a > >> > > clean, simple way to implement it, I'm happy to add it to this KIP, > >> but > >> > > otherwise I'd prefer not to tie it to the approval and release of the > >> > > features already proposed in the KIP. > >> > > > >> > > 2. Yeah, it's a little awkward. In my head I've justified the > >> ugliness of > >> > > the implementation with the smooth user-facing experience; falling > >> back > >> > > seamlessly on the PAUSED state without even logging an error message > >> is a > >> > > lot better than I'd initially hoped for when I was designing this > >> feature. > >> > > > >> > > I've also added an implementation plan to the KIP, which calls out the > >> > > different parts that can be worked on independently so that others > >> (hi Yash > >> > > 🙂) can also tackle parts of this if they'd like. > >> > > > >> > > Finally, I've removed the "type" field from the response body format > >> for > >> > > offset read requests. This way, users can copy+paste the response > >> from that > >> > > endpoint into a request to alter a connector's offsets without having > >> to > >> > > remove the "type" field first. An alternative was to keep the "type" > >> field > >> > > and add it to the request body format for altering offsets, but this > >> didn't > >> > > seem to make enough sense for cases not involving the aforementioned > >> > > copy+paste process. > >> > > > >> > > Cheers, > >> > > > >> > > Chris > >> > > > >> > > On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison < > >> mickael.mai...@gmail.com> > >> > > wrote: > >> > > > >> > > > Hi Chris, > >> > > > > >> > > > Thanks for the KIP, you're picking something that has been in my > >> todo > >> > > > list for a while ;) > >> > > > > >> > > > It looks good overall, I just have a couple of questions: > >> > > > 1) I consider both features listed in Future Work pretty important. > >> In > >> > > > both cases you mention the reason for not addressing them now is > >> > > > because of the implementation. If the design is simple and if we > >> have > >> > > > volunteers to implement them, I wonder if we could include them in > >> > > > this KIP. So you would not have to implement everything but we would > >> > > > have a single KIP and vote. > >> > > > > >> > > > 2) Regarding the backward compatibility for the stopped state. The > >> > > > "state.v2" field is a bit unfortunate but I can't think of a better > >> > > > solution. The other alternative would be to not do anything but I > >> > > > think the graceful degradation you propose is a bit better. > >> > > > > >> > > > Thanks, > >> > > > Mickael > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > > >> > > > On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton > >> <chr...@aiven.io.invalid> > >> > > > wrote: > >> > > > > > >> > > > > Hi Yash, > >> > > > > > >> > > > > Good question! This is actually a subtle source of asymmetry in > >> the > >> > > > current > >> > > > > proposal. Requests to delete a consumer group with active members > >> will > >> > > > > fail, so if there are zombie sink tasks that are still > >> communicating > >> > > with > >> > > > > Kafka, offset reset requests for that connector will also fail. > >> It is > >> > > > > possible to use an admin client to remove all active members from > >> the > >> > > > group > >> > > > > and then delete the group. However, this solution isn't as > >> complete as > >> > > > the > >> > > > > zombie fencing that we can perform for exactly-once source tasks, > >> since > >> > > > > removing consumers from a group doesn't prevent them from > >> immediately > >> > > > > rejoining the group, which would either cause the group deletion > >> > > request > >> > > > to > >> > > > > fail (if they rejoin before the group is deleted), or recreate the > >> > > group > >> > > > > (if they rejoin after the group is deleted). > >> > > > > > >> > > > > For ease of implementation, I'd prefer to leave the asymmetry in > >> the > >> > > API > >> > > > > for now and fail fast and clearly if there are still consumers > >> active > >> > > in > >> > > > > the sink connector's group. We can try to detect this case and > >> provide > >> > > a > >> > > > > helpful error message to the user explaining why the offset reset > >> > > request > >> > > > > has failed and some steps they can take to try to resolve things > >> (wait > >> > > > for > >> > > > > slow task shutdown to complete, restart zombie workers and/or > >> workers > >> > > > with > >> > > > > blocked tasks on them). In the future we can possibly even revisit > >> > > > KIP-611 > >> > > > > [1] or something like it to provide better insight into zombie > >> tasks > >> > > on a > >> > > > > worker so that it's easier to find which tasks have been > >> abandoned but > >> > > > are > >> > > > > still running. > >> > > > > > >> > > > > Let me know what you think; this is an important point to call > >> out and > >> > > if > >> > > > > we can reach some consensus on how to handle sink connector offset > >> > > resets > >> > > > > w/r/t zombie tasks, I'll update the KIP with the details. > >> > > > > > >> > > > > [1] - > >> > > > > > >> > > > > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks > >> > > > > > >> > > > > Cheers, > >> > > > > > >> > > > > Chris > >> > > > > > >> > > > > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya <yash.ma...@gmail.com> > >> > > wrote: > >> > > > > > >> > > > > > Hi Chris, > >> > > > > > > >> > > > > > Thanks for the response and the explanations, I think you've > >> answered > >> > > > > > pretty much all the questions I had meticulously! > >> > > > > > > >> > > > > > > >> > > > > > > if something goes wrong while resetting offsets, there's no > >> > > > > > > immediate impact--the connector will still be in the STOPPED > >> > > > > > > state. The REST response for requests to reset the offsets > >> > > > > > > will clearly call out that the operation has failed, and if > >> > > > necessary, > >> > > > > > > we can probably also add a scary-looking warning message > >> > > > > > > stating that we can't guarantee which offsets have been > >> > > successfully > >> > > > > > > wiped and which haven't. Users can query the exact offsets of > >> > > > > > > the connector at this point to determine what will happen > >> if/what > >> > > > they > >> > > > > > > resume it. And they can repeat attempts to reset the offsets > >> as > >> > > many > >> > > > > > > times as they'd like until they get back a 2XX response, > >> > > indicating > >> > > > > > > that it's finally safe to resume the connector. Thoughts? > >> > > > > > > >> > > > > > Yeah, I agree, the case that I mentioned earlier where a user > >> would > >> > > > try to > >> > > > > > resume a stopped connector after a failed offset reset attempt > >> > > without > >> > > > > > knowing that the offset reset attempt didn't fail cleanly is > >> probably > >> > > > just > >> > > > > > an extreme edge case. I think as long as the response is verbose > >> > > > enough and > >> > > > > > self explanatory, we should be fine. > >> > > > > > > >> > > > > > Another question that I had was behavior w.r.t sink connector > >> offset > >> > > > resets > >> > > > > > when there are zombie tasks/workers in the Connect cluster - > >> the KIP > >> > > > > > mentions that for sink connectors offset resets will be done by > >> > > > deleting > >> > > > > > the consumer group. However, if there are zombie tasks which are > >> > > still > >> > > > able > >> > > > > > to communicate with the Kafka cluster that the sink connector is > >> > > > consuming > >> > > > > > from, I think the consumer group will automatically get > >> re-created > >> > > and > >> > > > the > >> > > > > > zombie task may be able to commit offsets for the partitions > >> that it > >> > > is > >> > > > > > consuming from? > >> > > > > > > >> > > > > > Thanks, > >> > > > > > Yash > >> > > > > > > >> > > > > > > >> > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton > >> > > <chr...@aiven.io.invalid > >> > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Hi Yash, > >> > > > > > > > >> > > > > > > Thanks again for your thoughts! Responses to ongoing > >> discussions > >> > > > inline > >> > > > > > > (easier to track context than referencing comment numbers): > >> > > > > > > > >> > > > > > > > However, this then leads me to wonder if we can make that > >> > > explicit > >> > > > by > >> > > > > > > including "connect" or "connector" in the higher level field > >> names? > >> > > > Or do > >> > > > > > > you think this isn't required given that we're talking about a > >> > > > Connect > >> > > > > > > specific REST API in the first place? > >> > > > > > > > >> > > > > > > I think "partition" and "offset" are fine as field names but > >> I'm > >> > > not > >> > > > > > hugely > >> > > > > > > opposed to adding "connector " as a prefix to them; would be > >> > > > interested > >> > > > > > in > >> > > > > > > others' thoughts. > >> > > > > > > > >> > > > > > > > I'm not sure I followed why the unresolved writes to the > >> config > >> > > > topic > >> > > > > > > would be an issue - wouldn't the delete offsets request be > >> added to > >> > > > the > >> > > > > > > herder's request queue and whenever it is processed, we'd > >> anyway > >> > > > need to > >> > > > > > > check if all the prerequisites for the request are satisfied? > >> > > > > > > > >> > > > > > > Some requests are handled in multiple steps. For example, > >> deleting > >> > > a > >> > > > > > > connector (1) adds a request to the herder queue to write a > >> > > > tombstone to > >> > > > > > > the config topic (or, if the worker isn't the leader, forward > >> the > >> > > > request > >> > > > > > > to the leader). (2) Once that tombstone is picked up, (3) a > >> > > rebalance > >> > > > > > > ensues, and then after it's finally complete, (4) the > >> connector and > >> > > > its > >> > > > > > > tasks are shut down. I probably could have used better > >> terminology, > >> > > > but > >> > > > > > > what I meant by "unresolved writes to the config topic" was a > >> case > >> > > in > >> > > > > > > between steps (2) and (3)--where the worker has already read > >> that > >> > > > > > tombstone > >> > > > > > > from the config topic and knows that a rebalance is pending, > >> but > >> > > > hasn't > >> > > > > > > begun participating in that rebalance yet. In the > >> DistributedHerder > >> > > > > > class, > >> > > > > > > this is done via the `checkRebalanceNeeded` method. > >> > > > > > > > >> > > > > > > > We can probably revisit this potential deprecation [of the > >> PAUSED > >> > > > > > state] > >> > > > > > > in the future based on user feedback and how the adoption of > >> the > >> > > new > >> > > > > > > proposed stop endpoint looks like, what do you think? > >> > > > > > > > >> > > > > > > Yeah, revisiting in the future seems reasonable. 👍 > >> > > > > > > > >> > > > > > > And responses to new comments here: > >> > > > > > > > >> > > > > > > 8. Yep, we'll start tracking offsets by connector. I don't > >> believe > >> > > > this > >> > > > > > > should be too difficult, and suspect that the only reason we > >> track > >> > > > raw > >> > > > > > byte > >> > > > > > > arrays instead of pre-deserializing offset topic information > >> into > >> > > > > > something > >> > > > > > > more useful is because Connect originally had pluggable > >> internal > >> > > > > > > converters. Now that we're hardcoded to use the JSON > >> converter it > >> > > > should > >> > > > > > be > >> > > > > > > fine to track offsets on a per-connector basis as they're > >> read from > >> > > > the > >> > > > > > > offsets topic. > >> > > > > > > > >> > > > > > > 9. I'm hesitant to introduce this type of feature right now > >> because > >> > > > of > >> > > > > > all > >> > > > > > > of the gotchas that would come with it. In security-conscious > >> > > > > > environments, > >> > > > > > > it's possible that a sink connector's principal may have > >> access to > >> > > > the > >> > > > > > > consumer group used by the connector, but the worker's > >> principal > >> > > may > >> > > > not. > >> > > > > > > There's also the case where source connectors have separate > >> offsets > >> > > > > > topics, > >> > > > > > > or sink connectors have overridden consumer group IDs, or > >> sink or > >> > > > source > >> > > > > > > connectors work against a different Kafka cluster than the > >> one that > >> > > > their > >> > > > > > > worker uses. Overall, I'd rather provide a single API that > >> works in > >> > > > all > >> > > > > > > cases rather than risk confusing and alienating users by > >> trying to > >> > > > make > >> > > > > > > their lives easier in a subset of cases. > >> > > > > > > > >> > > > > > > 10. Hmm... I don't think the order of the writes matters too > >> much > >> > > > here, > >> > > > > > but > >> > > > > > > we probably could start by deleting from the global topic > >> first, > >> > > > that's > >> > > > > > > true. The reason I'm not hugely concerned about this case is > >> that > >> > > if > >> > > > > > > something goes wrong while resetting offsets, there's no > >> immediate > >> > > > > > > impact--the connector will still be in the STOPPED state. The > >> REST > >> > > > > > response > >> > > > > > > for requests to reset the offsets will clearly call out that > >> the > >> > > > > > operation > >> > > > > > > has failed, and if necessary, we can probably also add a > >> > > > scary-looking > >> > > > > > > warning message stating that we can't guarantee which offsets > >> have > >> > > > been > >> > > > > > > successfully wiped and which haven't. Users can query the > >> exact > >> > > > offsets > >> > > > > > of > >> > > > > > > the connector at this point to determine what will happen > >> if/what > >> > > > they > >> > > > > > > resume it. And they can repeat attempts to reset the offsets > >> as > >> > > many > >> > > > > > times > >> > > > > > > as they'd like until they get back a 2XX response, indicating > >> that > >> > > > it's > >> > > > > > > finally safe to resume the connector. Thoughts? > >> > > > > > > > >> > > > > > > 11. I haven't thought too much about it. I think something > >> like the > >> > > > > > > Monitorable* connectors would probably serve our needs here; > >> we can > >> > > > > > > instantiate them on a running Connect cluster and then use > >> various > >> > > > > > handles > >> > > > > > > to know how many times they've been polled, committed > >> records, etc. > >> > > > If > >> > > > > > > necessary we can tweak those classes or even write our own. > >> But > >> > > > anyways, > >> > > > > > > once that's all done, the test will be something like "create > >> a > >> > > > > > connector, > >> > > > > > > wait for it to produce N records (each of which contains some > >> kind > >> > > of > >> > > > > > > predictable offset), and ensure that the offsets for it in > >> the REST > >> > > > API > >> > > > > > > match up with the ones we'd expect from N records". Does that > >> > > answer > >> > > > your > >> > > > > > > question? > >> > > > > > > > >> > > > > > > Cheers, > >> > > > > > > > >> > > > > > > Chris > >> > > > > > > > >> > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya < > >> yash.ma...@gmail.com> > >> > > > wrote: > >> > > > > > > > >> > > > > > > > Hi Chris, > >> > > > > > > > > >> > > > > > > > 1. Thanks a lot for elaborating on this, I'm now convinced > >> about > >> > > > the > >> > > > > > > > usefulness of the new offset reset endpoint. Regarding the > >> > > > follow-up > >> > > > > > KIP > >> > > > > > > > for a fine-grained offset write API, I'd be happy to take > >> that on > >> > > > once > >> > > > > > > this > >> > > > > > > > KIP is finalized and I will definitely look forward to your > >> > > > feedback on > >> > > > > > > > that one! > >> > > > > > > > > >> > > > > > > > 2. Gotcha, the motivation makes more sense to me now. So the > >> > > higher > >> > > > > > level > >> > > > > > > > partition field represents a Connect specific "logical > >> partition" > >> > > > of > >> > > > > > > sorts > >> > > > > > > > - i.e. the source partition as defined by a connector for > >> source > >> > > > > > > connectors > >> > > > > > > > and a Kafka topic + partition for sink connectors. I like > >> the > >> > > idea > >> > > > of > >> > > > > > > > adding a Kafka prefix to the lower level partition/offset > >> (and > >> > > > topic) > >> > > > > > > > fields which basically makes it more clear (although > >> implicitly) > >> > > > that > >> > > > > > the > >> > > > > > > > higher level partition/offset field is Connect specific and > >> not > >> > > the > >> > > > > > same > >> > > > > > > as > >> > > > > > > > what those terms represent in Kafka itself. However, this > >> then > >> > > > leads me > >> > > > > > > to > >> > > > > > > > wonder if we can make that explicit by including "connect" > >> or > >> > > > > > "connector" > >> > > > > > > > in the higher level field names? Or do you think this isn't > >> > > > required > >> > > > > > > given > >> > > > > > > > that we're talking about a Connect specific REST API in the > >> first > >> > > > > > place? > >> > > > > > > > > >> > > > > > > > 3. Thanks, I think the response structure definitely looks > >> better > >> > > > now! > >> > > > > > > > > >> > > > > > > > 4. Interesting, I'd be curious to learn why we might want to > >> > > change > >> > > > > > this > >> > > > > > > in > >> > > > > > > > the future but that's probably out of scope for this > >> discussion. > >> > > > I'm > >> > > > > > not > >> > > > > > > > sure I followed why the unresolved writes to the config > >> topic > >> > > > would be > >> > > > > > an > >> > > > > > > > issue - wouldn't the delete offsets request be added to the > >> > > > herder's > >> > > > > > > > request queue and whenever it is processed, we'd anyway > >> need to > >> > > > check > >> > > > > > if > >> > > > > > > > all the prerequisites for the request are satisfied? > >> > > > > > > > > >> > > > > > > > 5. Thanks for elaborating that just fencing out the producer > >> > > still > >> > > > > > leaves > >> > > > > > > > many cases where source tasks remain hanging around and > >> also that > >> > > > we > >> > > > > > > anyway > >> > > > > > > > can't have similar data production guarantees for sink > >> connectors > >> > > > right > >> > > > > > > > now. I agree that it might be better to go with ease of > >> > > > implementation > >> > > > > > > and > >> > > > > > > > consistency for now. > >> > > > > > > > > >> > > > > > > > 6. Right, that does make sense but I still feel like the two > >> > > states > >> > > > > > will > >> > > > > > > > end up being confusing to end users who might not be able to > >> > > > discern > >> > > > > > the > >> > > > > > > > (fairly low-level) differences between them (also the > >> nuances of > >> > > > state > >> > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED > >> with the > >> > > > > > > > rebalancing implications as well). We can probably revisit > >> this > >> > > > > > potential > >> > > > > > > > deprecation in the future based on user feedback and how the > >> > > > adoption > >> > > > > > of > >> > > > > > > > the new proposed stop endpoint looks like, what do you > >> think? > >> > > > > > > > > >> > > > > > > > 7. Aha, that is completely my bad, I missed that the v1/v2 > >> state > >> > > is > >> > > > > > only > >> > > > > > > > applicable to the connector's target state and that we > >> don't need > >> > > > to > >> > > > > > > worry > >> > > > > > > > about the tasks since we will have an empty set of tasks. I > >> > > think I > >> > > > > > was a > >> > > > > > > > little confused by "pause the parts of the connector that > >> they > >> > > are > >> > > > > > > > assigned" from the KIP. Thanks for clarifying that! > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Some more thoughts and questions that I had - > >> > > > > > > > > >> > > > > > > > 8. Could you elaborate on what the implementation for offset > >> > > reset > >> > > > for > >> > > > > > > > source connectors would look like? Currently, it doesn't > >> look > >> > > like > >> > > > we > >> > > > > > > track > >> > > > > > > > all the partitions for a source connector anywhere. Will we > >> need > >> > > to > >> > > > > > > > book-keep this somewhere in order to be able to emit a > >> tombstone > >> > > > record > >> > > > > > > for > >> > > > > > > > each source partition? > >> > > > > > > > > >> > > > > > > > 9. The KIP describes the offset reset endpoint as only being > >> > > > usable on > >> > > > > > > > existing connectors that are in a `STOPPED` state. Why > >> wouldn't > >> > > we > >> > > > want > >> > > > > > > to > >> > > > > > > > allow resetting offsets for a deleted connector which seems > >> to > >> > > be a > >> > > > > > valid > >> > > > > > > > use case? Or do we plan to handle this use case only via > >> the item > >> > > > > > > outlined > >> > > > > > > > in the future work section - "Automatically delete offsets > >> with > >> > > > > > > > connectors"? > >> > > > > > > > > >> > > > > > > > 10. The KIP mentions that source offsets will be reset > >> > > > transactionally > >> > > > > > > for > >> > > > > > > > each topic (worker global offset topic and connector > >> specific > >> > > > offset > >> > > > > > > topic > >> > > > > > > > if it exists). While it obviously isn't possible to > >> atomically do > >> > > > the > >> > > > > > > > writes to two topics which may be on different Kafka > >> clusters, > >> > > I'm > >> > > > > > > > wondering about what would happen if the first transaction > >> > > > succeeds but > >> > > > > > > the > >> > > > > > > > second one fails. I think the order of the two transactions > >> > > matters > >> > > > > > here > >> > > > > > > - > >> > > > > > > > if we successfully emit tombstones to the connector specific > >> > > offset > >> > > > > > topic > >> > > > > > > > and fail to do so for the worker global offset topic, we'll > >> > > > presumably > >> > > > > > > fail > >> > > > > > > > the offset delete request because the KIP mentions that "A > >> > > request > >> > > > to > >> > > > > > > reset > >> > > > > > > > offsets for a source connector will only be considered > >> successful > >> > > > if > >> > > > > > the > >> > > > > > > > worker is able to delete all known offsets for that > >> connector, on > >> > > > both > >> > > > > > > the > >> > > > > > > > worker's global offsets topic and (if one is used) the > >> > > connector's > >> > > > > > > > dedicated offsets topic.". However, this will lead to the > >> > > connector > >> > > > > > only > >> > > > > > > > being able to read potentially older offsets from the worker > >> > > global > >> > > > > > > offset > >> > > > > > > > topic on resumption (based on the combined offset view > >> presented > >> > > as > >> > > > > > > > described in KIP-618 [1]). So, I think we should make sure > >> that > >> > > the > >> > > > > > > worker > >> > > > > > > > global offset topic tombstoning is attempted first, right? > >> Note > >> > > > that in > >> > > > > > > the > >> > > > > > > > current implementation of > >> `ConnectorOffsetBackingStore::set`, the > >> > > > > > > primary / > >> > > > > > > > connector specific offset store is written to first. > >> > > > > > > > > >> > > > > > > > 11. This probably isn't necessary to elaborate on in the KIP > >> > > > itself, > >> > > > > > but > >> > > > > > > I > >> > > > > > > > was wondering what the second offset test - "verify that > >> that > >> > > those > >> > > > > > > offsets > >> > > > > > > > reflect an expected level of progress for each connector > >> (i.e., > >> > > > they > >> > > > > > are > >> > > > > > > > greater than or equal to a certain value depending on how > >> the > >> > > > > > connectors > >> > > > > > > > are configured and how long they have been running)" - > >> would look > >> > > > like? > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Yash > >> > > > > > > > > >> > > > > > > > [1] - > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration > >> > > > > > > > > >> > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton > >> > > > <chr...@aiven.io.invalid > >> > > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi Yash, > >> > > > > > > > > > >> > > > > > > > > Thanks for your detailed thoughts. > >> > > > > > > > > > >> > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly > >> what's > >> > > > proposed > >> > > > > > in > >> > > > > > > > the > >> > > > > > > > > KIP right now: a way to reset offsets for connectors. > >> Sure, > >> > > > there's > >> > > > > > an > >> > > > > > > > > extra step of stopping the connector, but renaming a > >> connector > >> > > > isn't > >> > > > > > as > >> > > > > > > > > convenient of an alternative as it may seem since in many > >> cases > >> > > > you'd > >> > > > > > > > also > >> > > > > > > > > want to delete the older one, so the complete sequence of > >> steps > >> > > > would > >> > > > > > > be > >> > > > > > > > > something like delete the old connector, rename it > >> (possibly > >> > > > > > requiring > >> > > > > > > > > modifications to its config file, depending on which API > >> is > >> > > > used), > >> > > > > > then > >> > > > > > > > > create the renamed variant. It's also just not a great > >> user > >> > > > > > > > > experience--even if the practical impacts are limited > >> (which, > >> > > > IMO, > >> > > > > > they > >> > > > > > > > are > >> > > > > > > > > not), people have been asking for years about why they > >> have to > >> > > > employ > >> > > > > > > > this > >> > > > > > > > > kind of a workaround for a fairly common use case, and we > >> don't > >> > > > > > really > >> > > > > > > > have > >> > > > > > > > > a good answer beyond "we haven't implemented something > >> better > >> > > > yet". > >> > > > > > On > >> > > > > > > > top > >> > > > > > > > > of that, you may have external tooling that needs to be > >> tweaked > >> > > > to > >> > > > > > > > handle a > >> > > > > > > > > new connector name, you may have strict authorization > >> policies > >> > > > around > >> > > > > > > who > >> > > > > > > > > can access what connectors, you may have other ACLs > >> attached to > >> > > > the > >> > > > > > > name > >> > > > > > > > of > >> > > > > > > > > the connector (which can be especially common in the case > >> of > >> > > sink > >> > > > > > > > > connectors, whose consumer group IDs are tied to their > >> names by > >> > > > > > > default), > >> > > > > > > > > and leaving around state in the offsets topic that can > >> never be > >> > > > > > cleaned > >> > > > > > > > up > >> > > > > > > > > presents a bit of a footgun for users. It may not be a > >> silver > >> > > > bullet, > >> > > > > > > but > >> > > > > > > > > providing some mechanism to reset that state is a step in > >> the > >> > > > right > >> > > > > > > > > direction and allows responsible users to more carefully > >> > > > administer > >> > > > > > > their > >> > > > > > > > > cluster without resorting to non-public APIs. That said, > >> I do > >> > > > agree > >> > > > > > > that > >> > > > > > > > a > >> > > > > > > > > fine-grained reset/overwrite API would be useful, and I'd > >> be > >> > > > happy to > >> > > > > > > > > review a KIP to add that feature if anyone wants to > >> tackle it! > >> > > > > > > > > > >> > > > > > > > > 2. Keeping the two formats symmetrical is motivated > >> mostly by > >> > > > > > > aesthetics > >> > > > > > > > > and quality-of-life for programmatic interaction with the > >> API; > >> > > > it's > >> > > > > > not > >> > > > > > > > > really a goal to hide the use of consumer groups from > >> users. I > >> > > do > >> > > > > > agree > >> > > > > > > > > that the format is a little strange-looking for sink > >> > > connectors, > >> > > > but > >> > > > > > it > >> > > > > > > > > seemed like it would be easier to work with for UIs, > >> casual jq > >> > > > > > queries, > >> > > > > > > > and > >> > > > > > > > > CLIs than a more Kafka-specific alternative such as > >> > > > > > > > {"<topic>-<partition>": > >> > > > > > > > > "<offset>"}, and although it is a little strange, I don't > >> think > >> > > > it's > >> > > > > > > any > >> > > > > > > > > less readable or intuitive. That said, I've made some > >> tweaks to > >> > > > the > >> > > > > > > > format > >> > > > > > > > > that should make programmatic access even easier; > >> specifically, > >> > > > I've > >> > > > > > > > > removed the "source" and "sink" wrapper fields and instead > >> > > moved > >> > > > them > >> > > > > > > > into > >> > > > > > > > > a top-level object with a "type" and "offsets" field, > >> just like > >> > > > you > >> > > > > > > > > suggested in point 3 (thanks!). We might also consider > >> changing > >> > > > the > >> > > > > > > field > >> > > > > > > > > names for sink offsets from "topic", "partition", and > >> "offset" > >> > > to > >> > > > > > > "Kafka > >> > > > > > > > > topic", "Kafka partition", and "Kafka offset" > >> respectively, to > >> > > > reduce > >> > > > > > > the > >> > > > > > > > > stuttering effect of having a "partition" field inside a > >> > > > "partition" > >> > > > > > > > field > >> > > > > > > > > and the same with an "offset" field; thoughts? One final > >> > > > point--by > >> > > > > > > > equating > >> > > > > > > > > source and sink offsets, we probably make it easier for > >> users > >> > > to > >> > > > > > > > understand > >> > > > > > > > > exactly what a source offset is; anyone who's familiar > >> with > >> > > > consumer > >> > > > > > > > > offsets can see from the response format that we identify > >> a > >> > > > logical > >> > > > > > > > > partition as a combination of two entities (a topic and a > >> > > > partition > >> > > > > > > > > number); it should make it easier to grok what a source > >> offset > >> > > > is by > >> > > > > > > > seeing > >> > > > > > > > > what the two formats have in common. > >> > > > > > > > > > >> > > > > > > > > 3. Great idea! Done. > >> > > > > > > > > > >> > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the > >> response > >> > > > status > >> > > > > > > if > >> > > > > > > > a > >> > > > > > > > > rebalance is pending. I'd rather not add this to the KIP > >> as we > >> > > > may > >> > > > > > want > >> > > > > > > > to > >> > > > > > > > > change it at some point and it doesn't seem vital to > >> establish > >> > > > it as > >> > > > > > > part > >> > > > > > > > > of the public contract for the new endpoint right now. > >> Also, > >> > > > small > >> > > > > > > > > point--yes, a 409 is useful to avoid forwarding requests > >> to an > >> > > > > > > incorrect > >> > > > > > > > > leader, but it's also useful to ensure that there aren't > >> any > >> > > > > > unresolved > >> > > > > > > > > writes to the config topic that might cause issues with > >> the > >> > > > request > >> > > > > > > (such > >> > > > > > > > > as deleting the connector). > >> > > > > > > > > > >> > > > > > > > > 5. That's a good point--it may be misleading to call a > >> > > connector > >> > > > > > > STOPPED > >> > > > > > > > > when it has zombie tasks lying around on the cluster. I > >> don't > >> > > > think > >> > > > > > > it'd > >> > > > > > > > be > >> > > > > > > > > appropriate to do this synchronously while handling > >> requests to > >> > > > the > >> > > > > > PUT > >> > > > > > > > > /connectors/{connector}/stop since we'd want to give all > >> > > > > > > > currently-running > >> > > > > > > > > tasks a chance to gracefully shut down, though. I'm also > >> not > >> > > sure > >> > > > > > that > >> > > > > > > > this > >> > > > > > > > > is a significant problem, either. If the connector is > >> resumed, > >> > > > then > >> > > > > > all > >> > > > > > > > > zombie tasks will be automatically fenced out by their > >> > > > successors on > >> > > > > > > > > startup; if it's deleted, then we'll have wasted effort by > >> > > > performing > >> > > > > > > an > >> > > > > > > > > unnecessary round of fencing. It may be nice to guarantee > >> that > >> > > > source > >> > > > > > > > task > >> > > > > > > > > resources will be deallocated after the connector > >> transitions > >> > > to > >> > > > > > > STOPPED, > >> > > > > > > > > but realistically, it doesn't do much to just fence out > >> their > >> > > > > > > producers, > >> > > > > > > > > since tasks can be blocked on a number of other > >> operations such > >> > > > as > >> > > > > > > > > key/value/header conversion, transformation, and task > >> polling. > >> > > > It may > >> > > > > > > be > >> > > > > > > > a > >> > > > > > > > > little strange if data is produced to Kafka after the > >> connector > >> > > > has > >> > > > > > > > > transitioned to STOPPED, but we can't provide the same > >> > > > guarantees for > >> > > > > > > > sink > >> > > > > > > > > connectors, since their tasks may be stuck on a > >> long-running > >> > > > > > > > SinkTask::put > >> > > > > > > > > that emits data even after the Connect framework has > >> abandoned > >> > > > them > >> > > > > > > after > >> > > > > > > > > exhausting their graceful shutdown timeout. Ultimately, > >> I'd > >> > > > prefer to > >> > > > > > > err > >> > > > > > > > > on the side of consistency and ease of implementation for > >> now, > >> > > > but I > >> > > > > > > may > >> > > > > > > > be > >> > > > > > > > > missing a case where a few extra records from a task > >> that's > >> > > slow > >> > > > to > >> > > > > > > shut > >> > > > > > > > > down may cause serious issues--let me know. > >> > > > > > > > > > >> > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED state > >> > > right > >> > > > now > >> > > > > > as > >> > > > > > > > it > >> > > > > > > > > does serve a few purposes. Leaving tasks idling-but-ready > >> makes > >> > > > > > > resuming > >> > > > > > > > > them less disruptive across the cluster, since a rebalance > >> > > isn't > >> > > > > > > > necessary. > >> > > > > > > > > It also reduces latency to resume the connector, > >> especially for > >> > > > ones > >> > > > > > > that > >> > > > > > > > > have to do a lot of state gathering on initialization to, > >> e.g., > >> > > > read > >> > > > > > > > > offsets from an external system. > >> > > > > > > > > > >> > > > > > > > > 7. There should be no risk of mixed tasks after a > >> downgrade, > >> > > > thanks > >> > > > > > to > >> > > > > > > > the > >> > > > > > > > > empty set of task configs that gets published to the > >> config > >> > > > topic. > >> > > > > > Both > >> > > > > > > > > upgraded and downgraded workers will render an empty set > >> of > >> > > > tasks for > >> > > > > > > the > >> > > > > > > > > connector, and keep that set of empty tasks until the > >> connector > >> > > > is > >> > > > > > > > resumed. > >> > > > > > > > > Does that address your concerns? > >> > > > > > > > > > >> > > > > > > > > You're also correct that the linked Jira ticket was wrong; > >> > > > thanks for > >> > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended > >> ticket, and > >> > > > I've > >> > > > > > > > updated > >> > > > > > > > > the link in the KIP accordingly. > >> > > > > > > > > > >> > > > > > > > > Cheers, > >> > > > > > > > > > >> > > > > > > > > Chris > >> > > > > > > > > > >> > > > > > > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107 > >> > > > > > > > > > >> > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya < > >> > > > yash.ma...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Chris, > >> > > > > > > > > > > >> > > > > > > > > > Thanks a lot for this KIP, I think something like this > >> has > >> > > been > >> > > > > > long > >> > > > > > > > > > overdue for Kafka Connect :) > >> > > > > > > > > > > >> > > > > > > > > > Some thoughts and questions that I had - > >> > > > > > > > > > > >> > > > > > > > > > 1. I'm wondering if you could elaborate a little more > >> on the > >> > > > use > >> > > > > > case > >> > > > > > > > for > >> > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I > >> think we > >> > > > can > >> > > > > > all > >> > > > > > > > > agree > >> > > > > > > > > > that a fine grained reset API that allows setting > >> arbitrary > >> > > > offsets > >> > > > > > > for > >> > > > > > > > > > partitions would be quite useful (which you talk about > >> in the > >> > > > > > Future > >> > > > > > > > work > >> > > > > > > > > > section). But for the `DELETE > >> > > /connectors/{connector}/offsets` > >> > > > API > >> > > > > > in > >> > > > > > > > its > >> > > > > > > > > > described form, it looks like it would only serve a > >> seemingly > >> > > > niche > >> > > > > > > use > >> > > > > > > > > > case where users want to avoid renaming connectors - > >> because > >> > > > this > >> > > > > > new > >> > > > > > > > way > >> > > > > > > > > > of resetting offsets actually has more steps (i.e. stop > >> the > >> > > > > > > connector, > >> > > > > > > > > > reset offsets via the API, resume the connector) than > >> simply > >> > > > > > deleting > >> > > > > > > > and > >> > > > > > > > > > re-creating the connector with a different name? > >> > > > > > > > > > > >> > > > > > > > > > 2. The KIP talks about taking care that the response > >> formats > >> > > > > > > > (presumably > >> > > > > > > > > > only talking about the new GET API here) are > >> symmetrical for > >> > > > both > >> > > > > > > > source > >> > > > > > > > > > and sink connectors - is the end goal to have users of > >> Kafka > >> > > > > > Connect > >> > > > > > > > not > >> > > > > > > > > > even be aware that sink connectors use Kafka consumers > >> under > >> > > > the > >> > > > > > hood > >> > > > > > > > > (i.e. > >> > > > > > > > > > have that as purely an implementation detail abstracted > >> away > >> > > > from > >> > > > > > > > users)? > >> > > > > > > > > > While I understand the value of uniformity here, the > >> response > >> > > > > > format > >> > > > > > > > for > >> > > > > > > > > > sink connectors currently looks a little odd with the > >> > > > "partition" > >> > > > > > > field > >> > > > > > > > > > having "topic" and "partition" as sub-fields, > >> especially to > >> > > > users > >> > > > > > > > > familiar > >> > > > > > > > > > with Kafka semantics. Thoughts? > >> > > > > > > > > > > >> > > > > > > > > > 3. Another little nitpick on the response format - why > >> do we > >> > > > need > >> > > > > > > > > "source" > >> > > > > > > > > > / "sink" as a field under "offsets"? Users can query the > >> > > > connector > >> > > > > > > type > >> > > > > > > > > via > >> > > > > > > > > > the existing `GET /connectors` API. If it's deemed > >> important > >> > > > to let > >> > > > > > > > users > >> > > > > > > > > > know that the offsets they're seeing correspond to a > >> source / > >> > > > sink > >> > > > > > > > > > connector, maybe we could have a top level field "type" > >> in > >> > > the > >> > > > > > > response > >> > > > > > > > > for > >> > > > > > > > > > the `GET /connectors/{connector}/offsets` API similar > >> to the > >> > > > `GET > >> > > > > > > > > > /connectors` API? > >> > > > > > > > > > > >> > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` > >> API, the > >> > > > KIP > >> > > > > > > > mentions > >> > > > > > > > > > that requests will be rejected if a rebalance is > >> pending - > >> > > > > > presumably > >> > > > > > > > > this > >> > > > > > > > > > is to avoid forwarding requests to a leader which may no > >> > > > longer be > >> > > > > > > the > >> > > > > > > > > > leader after the pending rebalance? In this case, the > >> API > >> > > will > >> > > > > > > return a > >> > > > > > > > > > `409 Conflict` response similar to some of the existing > >> APIs, > >> > > > > > right? > >> > > > > > > > > > > >> > > > > > > > > > 5. Regarding fencing out previously running tasks for a > >> > > > connector, > >> > > > > > do > >> > > > > > > > you > >> > > > > > > > > > think it would make more sense semantically to have this > >> > > > > > implemented > >> > > > > > > in > >> > > > > > > > > the > >> > > > > > > > > > stop endpoint where an empty set of tasks is generated, > >> > > rather > >> > > > than > >> > > > > > > the > >> > > > > > > > > > delete offsets endpoint? This would also give the new > >> > > `STOPPED` > >> > > > > > > state a > >> > > > > > > > > > higher confidence of sorts, with any zombie tasks being > >> > > fenced > >> > > > off > >> > > > > > > from > >> > > > > > > > > > continuing to produce data. > >> > > > > > > > > > > >> > > > > > > > > > 6. Thanks for outlining the issues with the current > >> state of > >> > > > the > >> > > > > > > > `PAUSED` > >> > > > > > > > > > state - I think a lot of users expect it to behave like > >> the > >> > > > > > `STOPPED` > >> > > > > > > > > state > >> > > > > > > > > > you outline in the KIP and are (unpleasantly) surprised > >> when > >> > > it > >> > > > > > > > doesn't. > >> > > > > > > > > > However, this does beg the question of what the > >> usefulness of > >> > > > > > having > >> > > > > > > > two > >> > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we want to > >> > > > continue > >> > > > > > > > > > supporting both these states in the future, or do you > >> see the > >> > > > > > > `STOPPED` > >> > > > > > > > > > state eventually causing the existing `PAUSED` state to > >> be > >> > > > > > > deprecated? > >> > > > > > > > > > > >> > > > > > > > > > 7. I think the idea outlined in the KIP for handling a > >> new > >> > > > state > >> > > > > > > during > >> > > > > > > > > > cluster downgrades / rolling upgrades is quite clever, > >> but do > >> > > > you > >> > > > > > > think > >> > > > > > > > > > there could be any issues with having a mix of "paused" > >> and > >> > > > > > "stopped" > >> > > > > > > > > tasks > >> > > > > > > > > > for the same connector across workers in a cluster? At > >> the > >> > > very > >> > > > > > > least, > >> > > > > > > > I > >> > > > > > > > > > think it would be fairly confusing to most users. I'm > >> > > > wondering if > >> > > > > > > this > >> > > > > > > > > can > >> > > > > > > > > > be avoided by stating clearly in the KIP that the new > >> `PUT > >> > > > > > > > > > /connectors/{connector}/stop` > >> > > > > > > > > > can only be used on a cluster that is fully upgraded to > >> an AK > >> > > > > > version > >> > > > > > > > > newer > >> > > > > > > > > > than the one which ends up containing changes from this > >> KIP > >> > > and > >> > > > > > that > >> > > > > > > > if a > >> > > > > > > > > > cluster needs to be downgraded to an older version, the > >> user > >> > > > should > >> > > > > > > > > ensure > >> > > > > > > > > > that none of the connectors on the cluster are in a > >> stopped > >> > > > state? > >> > > > > > > With > >> > > > > > > > > the > >> > > > > > > > > > existing implementation, it looks like an > >> unknown/invalid > >> > > > target > >> > > > > > > state > >> > > > > > > > > > record is basically just discarded (with an error > >> message > >> > > > logged), > >> > > > > > so > >> > > > > > > > it > >> > > > > > > > > > doesn't seem to be a disastrous failure scenario that > >> can > >> > > bring > >> > > > > > down > >> > > > > > > a > >> > > > > > > > > > worker. > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Thanks, > >> > > > > > > > > > Yash > >> > > > > > > > > > > >> > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton > >> > > > > > > <chr...@aiven.io.invalid > >> > > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi Ashwin, > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks for your thoughts. Regarding your questions: > >> > > > > > > > > > > > >> > > > > > > > > > > 1. The response would show the offsets that are > >> visible to > >> > > > the > >> > > > > > > source > >> > > > > > > > > > > connector, so it would combine the contents of the two > >> > > > topics, > >> > > > > > > giving > >> > > > > > > > > > > priority to offsets present in the connector-specific > >> > > topic. > >> > > > I'm > >> > > > > > > > > > imagining > >> > > > > > > > > > > a follow-up question that some people may have in > >> response > >> > > to > >> > > > > > that > >> > > > > > > is > >> > > > > > > > > > > whether we'd want to provide insight into the > >> contents of a > >> > > > > > single > >> > > > > > > > > topic > >> > > > > > > > > > at > >> > > > > > > > > > > a time. It may be useful to be able to see this > >> information > >> > > > in > >> > > > > > > order > >> > > > > > > > to > >> > > > > > > > > > > debug connector issues or verify that it's safe to > >> stop > >> > > > using a > >> > > > > > > > > > > connector-specific offsets topic (either explicitly, > >> or > >> > > > > > implicitly > >> > > > > > > > via > >> > > > > > > > > > > cluster downgrade). What do you think about adding a > >> URL > >> > > > query > >> > > > > > > > > parameter > >> > > > > > > > > > > that allows users to dictate which view of the > >> connector's > >> > > > > > offsets > >> > > > > > > > they > >> > > > > > > > > > are > >> > > > > > > > > > > given in the REST response, with options for the > >> worker's > >> > > > global > >> > > > > > > > topic, > >> > > > > > > > > > the > >> > > > > > > > > > > connector-specific topic, and the combined view of > >> them > >> > > that > >> > > > the > >> > > > > > > > > > connector > >> > > > > > > > > > > and its tasks see (which would be the default)? This > >> may be > >> > > > too > >> > > > > > > much > >> > > > > > > > > for > >> > > > > > > > > > V1 > >> > > > > > > > > > > but it feels like it's at least worth exploring a bit. > >> > > > > > > > > > > > >> > > > > > > > > > > 2. There is no option for this at the moment. Reset > >> > > > semantics are > >> > > > > > > > > > extremely > >> > > > > > > > > > > coarse-grained; for source connectors, we delete all > >> source > >> > > > > > > offsets, > >> > > > > > > > > and > >> > > > > > > > > > > for sink connectors, we delete the entire consumer > >> group. > >> > > I'm > >> > > > > > > hoping > >> > > > > > > > > this > >> > > > > > > > > > > will be enough for V1 and that, if there's sufficient > >> > > demand > >> > > > for > >> > > > > > > it, > >> > > > > > > > we > >> > > > > > > > > > can > >> > > > > > > > > > > introduce a richer API for resetting or even modifying > >> > > > connector > >> > > > > > > > > offsets > >> > > > > > > > > > in > >> > > > > > > > > > > a follow-up KIP. > >> > > > > > > > > > > > >> > > > > > > > > > > 3. Good eye :) I think it's fine to keep the existing > >> > > > behavior > >> > > > > > for > >> > > > > > > > the > >> > > > > > > > > > > PAUSED state with the Connector instance, since the > >> primary > >> > > > > > purpose > >> > > > > > > > of > >> > > > > > > > > > the > >> > > > > > > > > > > Connector is to generate task configs and monitor the > >> > > > external > >> > > > > > > system > >> > > > > > > > > for > >> > > > > > > > > > > changes. If there's no chance for tasks to be running > >> > > > anyways, I > >> > > > > > > > don't > >> > > > > > > > > > see > >> > > > > > > > > > > much value in allowing paused connectors to generate > >> new > >> > > task > >> > > > > > > > configs, > >> > > > > > > > > > > especially since each time that happens a rebalance is > >> > > > triggered > >> > > > > > > and > >> > > > > > > > > > > there's a non-zero cost to that. What do you think? > >> > > > > > > > > > > > >> > > > > > > > > > > Cheers, > >> > > > > > > > > > > > >> > > > > > > > > > > Chris > >> > > > > > > > > > > > >> > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin > >> > > > > > > <apan...@confluent.io.invalid > >> > > > > > > > > > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful > >> feature. > >> > > > > > > > > > > > > >> > > > > > > > > > > > Can you please elaborate on the following in the > >> KIP - > >> > > > > > > > > > > > > >> > > > > > > > > > > > 1. How would the response of GET > >> > > > > > /connectors/{connector}/offsets > >> > > > > > > > look > >> > > > > > > > > > > like > >> > > > > > > > > > > > if the worker has both global and connector specific > >> > > > offsets > >> > > > > > > topic > >> > > > > > > > ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > 2. How can we pass the reset options like shift-by , > >> > > > > > to-date-time > >> > > > > > > > > etc. > >> > > > > > > > > > > > using a REST API like DELETE > >> > > > /connectors/{connector}/offsets ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes its > >> stop > >> > > > > > method - > >> > > > > > > > > will > >> > > > > > > > > > > > there be a change here to reduce confusion with the > >> new > >> > > > > > proposed > >> > > > > > > > > > STOPPED > >> > > > > > > > > > > > state ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > Ashwin > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton > >> > > > > > > > > <chr...@aiven.io.invalid > >> > > > > > > > > > > > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi all, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > I noticed a fairly large gap in the first version > >> of > >> > > > this KIP > >> > > > > > > > that > >> > > > > > > > > I > >> > > > > > > > > > > > > published last Friday, which has to do with > >> > > accommodating > >> > > > > > > > > connectors > >> > > > > > > > > > > > > that target different Kafka clusters than the one > >> that > >> > > > the > >> > > > > > > Kafka > >> > > > > > > > > > > Connect > >> > > > > > > > > > > > > cluster uses for its internal topics and source > >> > > > connectors > >> > > > > > with > >> > > > > > > > > > > dedicated > >> > > > > > > > > > > > > offsets topics. I've since updated the KIP to > >> address > >> > > > this > >> > > > > > gap, > >> > > > > > > > > which > >> > > > > > > > > > > has > >> > > > > > > > > > > > > substantially altered the design. Wanted to give a > >> > > > heads-up > >> > > > > > to > >> > > > > > > > > anyone > >> > > > > > > > > > > > > that's already started reviewing. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Chris > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton < > >> > > > > > chr...@aiven.io> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > I'd like to begin discussion on a KIP to add > >> offsets > >> > > > > > support > >> > > > > > > to > >> > > > > > > > > the > >> > > > > > > > > > > > Kafka > >> > > > > > > > > > > > > > Connect REST API: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Chris > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton > >> > > <chr...@aiven.io.invalid > >> > > > > > >> > > > > > wrote: > >> > > > > > > >> > > > > > > Hi Yash, > >> > > > > > > > >> > > > > > > Thanks again for your thoughts! Responses to ongoing > >> discussions > >> > > > inline > >> > > > > > > (easier to track context than referencing comment numbers): > >> > > > > > > > >> > > > > > > > However, this then leads me to wonder if we can make that > >> > > explicit > >> > > > by > >> > > > > > > including "connect" or "connector" in the higher level field > >> names? > >> > > > Or do > >> > > > > > > you think this isn't required given that we're talking about a > >> > > > Connect > >> > > > > > > specific REST API in the first place? > >> > > > > > > > >> > > > > > > I think "partition" and "offset" are fine as field names but > >> I'm > >> > > not > >> > > > > > hugely > >> > > > > > > opposed to adding "connector " as a prefix to them; would be > >> > > > interested > >> > > > > > in > >> > > > > > > others' thoughts. > >> > > > > > > > >> > > > > > > > I'm not sure I followed why the unresolved writes to the > >> config > >> > > > topic > >> > > > > > > would be an issue - wouldn't the delete offsets request be > >> added to > >> > > > the > >> > > > > > > herder's request queue and whenever it is processed, we'd > >> anyway > >> > > > need to > >> > > > > > > check if all the prerequisites for the request are satisfied? > >> > > > > > > > >> > > > > > > Some requests are handled in multiple steps. For example, > >> deleting > >> > > a > >> > > > > > > connector (1) adds a request to the herder queue to write a > >> > > > tombstone to > >> > > > > > > the config topic (or, if the worker isn't the leader, forward > >> the > >> > > > request > >> > > > > > > to the leader). (2) Once that tombstone is picked up, (3) a > >> > > rebalance > >> > > > > > > ensues, and then after it's finally complete, (4) the > >> connector and > >> > > > its > >> > > > > > > tasks are shut down. I probably could have used better > >> terminology, > >> > > > but > >> > > > > > > what I meant by "unresolved writes to the config topic" was a > >> case > >> > > in > >> > > > > > > between steps (2) and (3)--where the worker has already read > >> that > >> > > > > > tombstone > >> > > > > > > from the config topic and knows that a rebalance is pending, > >> but > >> > > > hasn't > >> > > > > > > begun participating in that rebalance yet. In the > >> DistributedHerder > >> > > > > > class, > >> > > > > > > this is done via the `checkRebalanceNeeded` method. > >> > > > > > > > >> > > > > > > > We can probably revisit this potential deprecation [of the > >> PAUSED > >> > > > > > state] > >> > > > > > > in the future based on user feedback and how the adoption of > >> the > >> > > new > >> > > > > > > proposed stop endpoint looks like, what do you think? > >> > > > > > > > >> > > > > > > Yeah, revisiting in the future seems reasonable. 👍 > >> > > > > > > > >> > > > > > > And responses to new comments here: > >> > > > > > > > >> > > > > > > 8. Yep, we'll start tracking offsets by connector. I don't > >> believe > >> > > > this > >> > > > > > > should be too difficult, and suspect that the only reason we > >> track > >> > > > raw > >> > > > > > byte > >> > > > > > > arrays instead of pre-deserializing offset topic information > >> into > >> > > > > > something > >> > > > > > > more useful is because Connect originally had pluggable > >> internal > >> > > > > > > converters. Now that we're hardcoded to use the JSON > >> converter it > >> > > > should > >> > > > > > be > >> > > > > > > fine to track offsets on a per-connector basis as they're > >> read from > >> > > > the > >> > > > > > > offsets topic. > >> > > > > > > > >> > > > > > > 9. I'm hesitant to introduce this type of feature right now > >> because > >> > > > of > >> > > > > > all > >> > > > > > > of the gotchas that would come with it. In security-conscious > >> > > > > > environments, > >> > > > > > > it's possible that a sink connector's principal may have > >> access to > >> > > > the > >> > > > > > > consumer group used by the connector, but the worker's > >> principal > >> > > may > >> > > > not. > >> > > > > > > There's also the case where source connectors have separate > >> offsets > >> > > > > > topics, > >> > > > > > > or sink connectors have overridden consumer group IDs, or > >> sink or > >> > > > source > >> > > > > > > connectors work against a different Kafka cluster than the > >> one that > >> > > > their > >> > > > > > > worker uses. Overall, I'd rather provide a single API that > >> works in > >> > > > all > >> > > > > > > cases rather than risk confusing and alienating users by > >> trying to > >> > > > make > >> > > > > > > their lives easier in a subset of cases. > >> > > > > > > > >> > > > > > > 10. Hmm... I don't think the order of the writes matters too > >> much > >> > > > here, > >> > > > > > but > >> > > > > > > we probably could start by deleting from the global topic > >> first, > >> > > > that's > >> > > > > > > true. The reason I'm not hugely concerned about this case is > >> that > >> > > if > >> > > > > > > something goes wrong while resetting offsets, there's no > >> immediate > >> > > > > > > impact--the connector will still be in the STOPPED state. The > >> REST > >> > > > > > response > >> > > > > > > for requests to reset the offsets will clearly call out that > >> the > >> > > > > > operation > >> > > > > > > has failed, and if necessary, we can probably also add a > >> > > > scary-looking > >> > > > > > > warning message stating that we can't guarantee which offsets > >> have > >> > > > been > >> > > > > > > successfully wiped and which haven't. Users can query the > >> exact > >> > > > offsets > >> > > > > > of > >> > > > > > > the connector at this point to determine what will happen > >> if/what > >> > > > they > >> > > > > > > resume it. And they can repeat attempts to reset the offsets > >> as > >> > > many > >> > > > > > times > >> > > > > > > as they'd like until they get back a 2XX response, indicating > >> that > >> > > > it's > >> > > > > > > finally safe to resume the connector. Thoughts? > >> > > > > > > > >> > > > > > > 11. I haven't thought too much about it. I think something > >> like the > >> > > > > > > Monitorable* connectors would probably serve our needs here; > >> we can > >> > > > > > > instantiate them on a running Connect cluster and then use > >> various > >> > > > > > handles > >> > > > > > > to know how many times they've been polled, committed > >> records, etc. > >> > > > If > >> > > > > > > necessary we can tweak those classes or even write our own. > >> But > >> > > > anyways, > >> > > > > > > once that's all done, the test will be something like "create > >> a > >> > > > > > connector, > >> > > > > > > wait for it to produce N records (each of which contains some > >> kind > >> > > of > >> > > > > > > predictable offset), and ensure that the offsets for it in > >> the REST > >> > > > API > >> > > > > > > match up with the ones we'd expect from N records". Does that > >> > > answer > >> > > > your > >> > > > > > > question? > >> > > > > > > > >> > > > > > > Cheers, > >> > > > > > > > >> > > > > > > Chris > >> > > > > > > > >> > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya < > >> yash.ma...@gmail.com> > >> > > > wrote: > >> > > > > > > > >> > > > > > > > Hi Chris, > >> > > > > > > > > >> > > > > > > > 1. Thanks a lot for elaborating on this, I'm now convinced > >> about > >> > > > the > >> > > > > > > > usefulness of the new offset reset endpoint. Regarding the > >> > > > follow-up > >> > > > > > KIP > >> > > > > > > > for a fine-grained offset write API, I'd be happy to take > >> that on > >> > > > once > >> > > > > > > this > >> > > > > > > > KIP is finalized and I will definitely look forward to your > >> > > > feedback on > >> > > > > > > > that one! > >> > > > > > > > > >> > > > > > > > 2. Gotcha, the motivation makes more sense to me now. So the > >> > > higher > >> > > > > > level > >> > > > > > > > partition field represents a Connect specific "logical > >> partition" > >> > > > of > >> > > > > > > sorts > >> > > > > > > > - i.e. the source partition as defined by a connector for > >> source > >> > > > > > > connectors > >> > > > > > > > and a Kafka topic + partition for sink connectors. I like > >> the > >> > > idea > >> > > > of > >> > > > > > > > adding a Kafka prefix to the lower level partition/offset > >> (and > >> > > > topic) > >> > > > > > > > fields which basically makes it more clear (although > >> implicitly) > >> > > > that > >> > > > > > the > >> > > > > > > > higher level partition/offset field is Connect specific and > >> not > >> > > the > >> > > > > > same > >> > > > > > > as > >> > > > > > > > what those terms represent in Kafka itself. However, this > >> then > >> > > > leads me > >> > > > > > > to > >> > > > > > > > wonder if we can make that explicit by including "connect" > >> or > >> > > > > > "connector" > >> > > > > > > > in the higher level field names? Or do you think this isn't > >> > > > required > >> > > > > > > given > >> > > > > > > > that we're talking about a Connect specific REST API in the > >> first > >> > > > > > place? > >> > > > > > > > > >> > > > > > > > 3. Thanks, I think the response structure definitely looks > >> better > >> > > > now! > >> > > > > > > > > >> > > > > > > > 4. Interesting, I'd be curious to learn why we might want to > >> > > change > >> > > > > > this > >> > > > > > > in > >> > > > > > > > the future but that's probably out of scope for this > >> discussion. > >> > > > I'm > >> > > > > > not > >> > > > > > > > sure I followed why the unresolved writes to the config > >> topic > >> > > > would be > >> > > > > > an > >> > > > > > > > issue - wouldn't the delete offsets request be added to the > >> > > > herder's > >> > > > > > > > request queue and whenever it is processed, we'd anyway > >> need to > >> > > > check > >> > > > > > if > >> > > > > > > > all the prerequisites for the request are satisfied? > >> > > > > > > > > >> > > > > > > > 5. Thanks for elaborating that just fencing out the producer > >> > > still > >> > > > > > leaves > >> > > > > > > > many cases where source tasks remain hanging around and > >> also that > >> > > > we > >> > > > > > > anyway > >> > > > > > > > can't have similar data production guarantees for sink > >> connectors > >> > > > right > >> > > > > > > > now. I agree that it might be better to go with ease of > >> > > > implementation > >> > > > > > > and > >> > > > > > > > consistency for now. > >> > > > > > > > > >> > > > > > > > 6. Right, that does make sense but I still feel like the two > >> > > states > >> > > > > > will > >> > > > > > > > end up being confusing to end users who might not be able to > >> > > > discern > >> > > > > > the > >> > > > > > > > (fairly low-level) differences between them (also the > >> nuances of > >> > > > state > >> > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED > >> with the > >> > > > > > > > rebalancing implications as well). We can probably revisit > >> this > >> > > > > > potential > >> > > > > > > > deprecation in the future based on user feedback and how the > >> > > > adoption > >> > > > > > of > >> > > > > > > > the new proposed stop endpoint looks like, what do you > >> think? > >> > > > > > > > > >> > > > > > > > 7. Aha, that is completely my bad, I missed that the v1/v2 > >> state > >> > > is > >> > > > > > only > >> > > > > > > > applicable to the connector's target state and that we > >> don't need > >> > > > to > >> > > > > > > worry > >> > > > > > > > about the tasks since we will have an empty set of tasks. I > >> > > think I > >> > > > > > was a > >> > > > > > > > little confused by "pause the parts of the connector that > >> they > >> > > are > >> > > > > > > > assigned" from the KIP. Thanks for clarifying that! > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Some more thoughts and questions that I had - > >> > > > > > > > > >> > > > > > > > 8. Could you elaborate on what the implementation for offset > >> > > reset > >> > > > for > >> > > > > > > > source connectors would look like? Currently, it doesn't > >> look > >> > > like > >> > > > we > >> > > > > > > track > >> > > > > > > > all the partitions for a source connector anywhere. Will we > >> need > >> > > to > >> > > > > > > > book-keep this somewhere in order to be able to emit a > >> tombstone > >> > > > record > >> > > > > > > for > >> > > > > > > > each source partition? > >> > > > > > > > > >> > > > > > > > 9. The KIP describes the offset reset endpoint as only being > >> > > > usable on > >> > > > > > > > existing connectors that are in a `STOPPED` state. Why > >> wouldn't > >> > > we > >> > > > want > >> > > > > > > to > >> > > > > > > > allow resetting offsets for a deleted connector which seems > >> to > >> > > be a > >> > > > > > valid > >> > > > > > > > use case? Or do we plan to handle this use case only via > >> the item > >> > > > > > > outlined > >> > > > > > > > in the future work section - "Automatically delete offsets > >> with > >> > > > > > > > connectors"? > >> > > > > > > > > >> > > > > > > > 10. The KIP mentions that source offsets will be reset > >> > > > transactionally > >> > > > > > > for > >> > > > > > > > each topic (worker global offset topic and connector > >> specific > >> > > > offset > >> > > > > > > topic > >> > > > > > > > if it exists). While it obviously isn't possible to > >> atomically do > >> > > > the > >> > > > > > > > writes to two topics which may be on different Kafka > >> clusters, > >> > > I'm > >> > > > > > > > wondering about what would happen if the first transaction > >> > > > succeeds but > >> > > > > > > the > >> > > > > > > > second one fails. I think the order of the two transactions > >> > > matters > >> > > > > > here > >> > > > > > > - > >> > > > > > > > if we successfully emit tombstones to the connector specific > >> > > offset > >> > > > > > topic > >> > > > > > > > and fail to do so for the worker global offset topic, we'll > >> > > > presumably > >> > > > > > > fail > >> > > > > > > > the offset delete request because the KIP mentions that "A > >> > > request > >> > > > to > >> > > > > > > reset > >> > > > > > > > offsets for a source connector will only be considered > >> successful > >> > > > if > >> > > > > > the > >> > > > > > > > worker is able to delete all known offsets for that > >> connector, on > >> > > > both > >> > > > > > > the > >> > > > > > > > worker's global offsets topic and (if one is used) the > >> > > connector's > >> > > > > > > > dedicated offsets topic.". However, this will lead to the > >> > > connector > >> > > > > > only > >> > > > > > > > being able to read potentially older offsets from the worker > >> > > global > >> > > > > > > offset > >> > > > > > > > topic on resumption (based on the combined offset view > >> presented > >> > > as > >> > > > > > > > described in KIP-618 [1]). So, I think we should make sure > >> that > >> > > the > >> > > > > > > worker > >> > > > > > > > global offset topic tombstoning is attempted first, right? > >> Note > >> > > > that in > >> > > > > > > the > >> > > > > > > > current implementation of > >> `ConnectorOffsetBackingStore::set`, the > >> > > > > > > primary / > >> > > > > > > > connector specific offset store is written to first. > >> > > > > > > > > >> > > > > > > > 11. This probably isn't necessary to elaborate on in the KIP > >> > > > itself, > >> > > > > > but > >> > > > > > > I > >> > > > > > > > was wondering what the second offset test - "verify that > >> that > >> > > those > >> > > > > > > offsets > >> > > > > > > > reflect an expected level of progress for each connector > >> (i.e., > >> > > > they > >> > > > > > are > >> > > > > > > > greater than or equal to a certain value depending on how > >> the > >> > > > > > connectors > >> > > > > > > > are configured and how long they have been running)" - > >> would look > >> > > > like? > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > Thanks, > >> > > > > > > > Yash > >> > > > > > > > > >> > > > > > > > [1] - > >> > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration > >> > > > > > > > > >> > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton > >> > > > <chr...@aiven.io.invalid > >> > > > > > > > >> > > > > > > > wrote: > >> > > > > > > > > >> > > > > > > > > Hi Yash, > >> > > > > > > > > > >> > > > > > > > > Thanks for your detailed thoughts. > >> > > > > > > > > > >> > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly > >> what's > >> > > > proposed > >> > > > > > in > >> > > > > > > > the > >> > > > > > > > > KIP right now: a way to reset offsets for connectors. > >> Sure, > >> > > > there's > >> > > > > > an > >> > > > > > > > > extra step of stopping the connector, but renaming a > >> connector > >> > > > isn't > >> > > > > > as > >> > > > > > > > > convenient of an alternative as it may seem since in many > >> cases > >> > > > you'd > >> > > > > > > > also > >> > > > > > > > > want to delete the older one, so the complete sequence of > >> steps > >> > > > would > >> > > > > > > be > >> > > > > > > > > something like delete the old connector, rename it > >> (possibly > >> > > > > > requiring > >> > > > > > > > > modifications to its config file, depending on which API > >> is > >> > > > used), > >> > > > > > then > >> > > > > > > > > create the renamed variant. It's also just not a great > >> user > >> > > > > > > > > experience--even if the practical impacts are limited > >> (which, > >> > > > IMO, > >> > > > > > they > >> > > > > > > > are > >> > > > > > > > > not), people have been asking for years about why they > >> have to > >> > > > employ > >> > > > > > > > this > >> > > > > > > > > kind of a workaround for a fairly common use case, and we > >> don't > >> > > > > > really > >> > > > > > > > have > >> > > > > > > > > a good answer beyond "we haven't implemented something > >> better > >> > > > yet". > >> > > > > > On > >> > > > > > > > top > >> > > > > > > > > of that, you may have external tooling that needs to be > >> tweaked > >> > > > to > >> > > > > > > > handle a > >> > > > > > > > > new connector name, you may have strict authorization > >> policies > >> > > > around > >> > > > > > > who > >> > > > > > > > > can access what connectors, you may have other ACLs > >> attached to > >> > > > the > >> > > > > > > name > >> > > > > > > > of > >> > > > > > > > > the connector (which can be especially common in the case > >> of > >> > > sink > >> > > > > > > > > connectors, whose consumer group IDs are tied to their > >> names by > >> > > > > > > default), > >> > > > > > > > > and leaving around state in the offsets topic that can > >> never be > >> > > > > > cleaned > >> > > > > > > > up > >> > > > > > > > > presents a bit of a footgun for users. It may not be a > >> silver > >> > > > bullet, > >> > > > > > > but > >> > > > > > > > > providing some mechanism to reset that state is a step in > >> the > >> > > > right > >> > > > > > > > > direction and allows responsible users to more carefully > >> > > > administer > >> > > > > > > their > >> > > > > > > > > cluster without resorting to non-public APIs. That said, > >> I do > >> > > > agree > >> > > > > > > that > >> > > > > > > > a > >> > > > > > > > > fine-grained reset/overwrite API would be useful, and I'd > >> be > >> > > > happy to > >> > > > > > > > > review a KIP to add that feature if anyone wants to > >> tackle it! > >> > > > > > > > > > >> > > > > > > > > 2. Keeping the two formats symmetrical is motivated > >> mostly by > >> > > > > > > aesthetics > >> > > > > > > > > and quality-of-life for programmatic interaction with the > >> API; > >> > > > it's > >> > > > > > not > >> > > > > > > > > really a goal to hide the use of consumer groups from > >> users. I > >> > > do > >> > > > > > agree > >> > > > > > > > > that the format is a little strange-looking for sink > >> > > connectors, > >> > > > but > >> > > > > > it > >> > > > > > > > > seemed like it would be easier to work with for UIs, > >> casual jq > >> > > > > > queries, > >> > > > > > > > and > >> > > > > > > > > CLIs than a more Kafka-specific alternative such as > >> > > > > > > > {"<topic>-<partition>": > >> > > > > > > > > "<offset>"}, and although it is a little strange, I don't > >> think > >> > > > it's > >> > > > > > > any > >> > > > > > > > > less readable or intuitive. That said, I've made some > >> tweaks to > >> > > > the > >> > > > > > > > format > >> > > > > > > > > that should make programmatic access even easier; > >> specifically, > >> > > > I've > >> > > > > > > > > removed the "source" and "sink" wrapper fields and instead > >> > > moved > >> > > > them > >> > > > > > > > into > >> > > > > > > > > a top-level object with a "type" and "offsets" field, > >> just like > >> > > > you > >> > > > > > > > > suggested in point 3 (thanks!). We might also consider > >> changing > >> > > > the > >> > > > > > > field > >> > > > > > > > > names for sink offsets from "topic", "partition", and > >> "offset" > >> > > to > >> > > > > > > "Kafka > >> > > > > > > > > topic", "Kafka partition", and "Kafka offset" > >> respectively, to > >> > > > reduce > >> > > > > > > the > >> > > > > > > > > stuttering effect of having a "partition" field inside a > >> > > > "partition" > >> > > > > > > > field > >> > > > > > > > > and the same with an "offset" field; thoughts? One final > >> > > > point--by > >> > > > > > > > equating > >> > > > > > > > > source and sink offsets, we probably make it easier for > >> users > >> > > to > >> > > > > > > > understand > >> > > > > > > > > exactly what a source offset is; anyone who's familiar > >> with > >> > > > consumer > >> > > > > > > > > offsets can see from the response format that we identify > >> a > >> > > > logical > >> > > > > > > > > partition as a combination of two entities (a topic and a > >> > > > partition > >> > > > > > > > > number); it should make it easier to grok what a source > >> offset > >> > > > is by > >> > > > > > > > seeing > >> > > > > > > > > what the two formats have in common. > >> > > > > > > > > > >> > > > > > > > > 3. Great idea! Done. > >> > > > > > > > > > >> > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the > >> response > >> > > > status > >> > > > > > > if > >> > > > > > > > a > >> > > > > > > > > rebalance is pending. I'd rather not add this to the KIP > >> as we > >> > > > may > >> > > > > > want > >> > > > > > > > to > >> > > > > > > > > change it at some point and it doesn't seem vital to > >> establish > >> > > > it as > >> > > > > > > part > >> > > > > > > > > of the public contract for the new endpoint right now. > >> Also, > >> > > > small > >> > > > > > > > > point--yes, a 409 is useful to avoid forwarding requests > >> to an > >> > > > > > > incorrect > >> > > > > > > > > leader, but it's also useful to ensure that there aren't > >> any > >> > > > > > unresolved > >> > > > > > > > > writes to the config topic that might cause issues with > >> the > >> > > > request > >> > > > > > > (such > >> > > > > > > > > as deleting the connector). > >> > > > > > > > > > >> > > > > > > > > 5. That's a good point--it may be misleading to call a > >> > > connector > >> > > > > > > STOPPED > >> > > > > > > > > when it has zombie tasks lying around on the cluster. I > >> don't > >> > > > think > >> > > > > > > it'd > >> > > > > > > > be > >> > > > > > > > > appropriate to do this synchronously while handling > >> requests to > >> > > > the > >> > > > > > PUT > >> > > > > > > > > /connectors/{connector}/stop since we'd want to give all > >> > > > > > > > currently-running > >> > > > > > > > > tasks a chance to gracefully shut down, though. I'm also > >> not > >> > > sure > >> > > > > > that > >> > > > > > > > this > >> > > > > > > > > is a significant problem, either. If the connector is > >> resumed, > >> > > > then > >> > > > > > all > >> > > > > > > > > zombie tasks will be automatically fenced out by their > >> > > > successors on > >> > > > > > > > > startup; if it's deleted, then we'll have wasted effort by > >> > > > performing > >> > > > > > > an > >> > > > > > > > > unnecessary round of fencing. It may be nice to guarantee > >> that > >> > > > source > >> > > > > > > > task > >> > > > > > > > > resources will be deallocated after the connector > >> transitions > >> > > to > >> > > > > > > STOPPED, > >> > > > > > > > > but realistically, it doesn't do much to just fence out > >> their > >> > > > > > > producers, > >> > > > > > > > > since tasks can be blocked on a number of other > >> operations such > >> > > > as > >> > > > > > > > > key/value/header conversion, transformation, and task > >> polling. > >> > > > It may > >> > > > > > > be > >> > > > > > > > a > >> > > > > > > > > little strange if data is produced to Kafka after the > >> connector > >> > > > has > >> > > > > > > > > transitioned to STOPPED, but we can't provide the same > >> > > > guarantees for > >> > > > > > > > sink > >> > > > > > > > > connectors, since their tasks may be stuck on a > >> long-running > >> > > > > > > > SinkTask::put > >> > > > > > > > > that emits data even after the Connect framework has > >> abandoned > >> > > > them > >> > > > > > > after > >> > > > > > > > > exhausting their graceful shutdown timeout. Ultimately, > >> I'd > >> > > > prefer to > >> > > > > > > err > >> > > > > > > > > on the side of consistency and ease of implementation for > >> now, > >> > > > but I > >> > > > > > > may > >> > > > > > > > be > >> > > > > > > > > missing a case where a few extra records from a task > >> that's > >> > > slow > >> > > > to > >> > > > > > > shut > >> > > > > > > > > down may cause serious issues--let me know. > >> > > > > > > > > > >> > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED state > >> > > right > >> > > > now > >> > > > > > as > >> > > > > > > > it > >> > > > > > > > > does serve a few purposes. Leaving tasks idling-but-ready > >> makes > >> > > > > > > resuming > >> > > > > > > > > them less disruptive across the cluster, since a rebalance > >> > > isn't > >> > > > > > > > necessary. > >> > > > > > > > > It also reduces latency to resume the connector, > >> especially for > >> > > > ones > >> > > > > > > that > >> > > > > > > > > have to do a lot of state gathering on initialization to, > >> e.g., > >> > > > read > >> > > > > > > > > offsets from an external system. > >> > > > > > > > > > >> > > > > > > > > 7. There should be no risk of mixed tasks after a > >> downgrade, > >> > > > thanks > >> > > > > > to > >> > > > > > > > the > >> > > > > > > > > empty set of task configs that gets published to the > >> config > >> > > > topic. > >> > > > > > Both > >> > > > > > > > > upgraded and downgraded workers will render an empty set > >> of > >> > > > tasks for > >> > > > > > > the > >> > > > > > > > > connector, and keep that set of empty tasks until the > >> connector > >> > > > is > >> > > > > > > > resumed. > >> > > > > > > > > Does that address your concerns? > >> > > > > > > > > > >> > > > > > > > > You're also correct that the linked Jira ticket was wrong; > >> > > > thanks for > >> > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended > >> ticket, and > >> > > > I've > >> > > > > > > > updated > >> > > > > > > > > the link in the KIP accordingly. > >> > > > > > > > > > >> > > > > > > > > Cheers, > >> > > > > > > > > > >> > > > > > > > > Chris > >> > > > > > > > > > >> > > > > > > > > [1] - https://issues.apache.org/jira/browse/KAFKA-4107 > >> > > > > > > > > > >> > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya < > >> > > > yash.ma...@gmail.com> > >> > > > > > > > wrote: > >> > > > > > > > > > >> > > > > > > > > > Hi Chris, > >> > > > > > > > > > > >> > > > > > > > > > Thanks a lot for this KIP, I think something like this > >> has > >> > > been > >> > > > > > long > >> > > > > > > > > > overdue for Kafka Connect :) > >> > > > > > > > > > > >> > > > > > > > > > Some thoughts and questions that I had - > >> > > > > > > > > > > >> > > > > > > > > > 1. I'm wondering if you could elaborate a little more > >> on the > >> > > > use > >> > > > > > case > >> > > > > > > > for > >> > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I > >> think we > >> > > > can > >> > > > > > all > >> > > > > > > > > agree > >> > > > > > > > > > that a fine grained reset API that allows setting > >> arbitrary > >> > > > offsets > >> > > > > > > for > >> > > > > > > > > > partitions would be quite useful (which you talk about > >> in the > >> > > > > > Future > >> > > > > > > > work > >> > > > > > > > > > section). But for the `DELETE > >> > > /connectors/{connector}/offsets` > >> > > > API > >> > > > > > in > >> > > > > > > > its > >> > > > > > > > > > described form, it looks like it would only serve a > >> seemingly > >> > > > niche > >> > > > > > > use > >> > > > > > > > > > case where users want to avoid renaming connectors - > >> because > >> > > > this > >> > > > > > new > >> > > > > > > > way > >> > > > > > > > > > of resetting offsets actually has more steps (i.e. stop > >> the > >> > > > > > > connector, > >> > > > > > > > > > reset offsets via the API, resume the connector) than > >> simply > >> > > > > > deleting > >> > > > > > > > and > >> > > > > > > > > > re-creating the connector with a different name? > >> > > > > > > > > > > >> > > > > > > > > > 2. The KIP talks about taking care that the response > >> formats > >> > > > > > > > (presumably > >> > > > > > > > > > only talking about the new GET API here) are > >> symmetrical for > >> > > > both > >> > > > > > > > source > >> > > > > > > > > > and sink connectors - is the end goal to have users of > >> Kafka > >> > > > > > Connect > >> > > > > > > > not > >> > > > > > > > > > even be aware that sink connectors use Kafka consumers > >> under > >> > > > the > >> > > > > > hood > >> > > > > > > > > (i.e. > >> > > > > > > > > > have that as purely an implementation detail abstracted > >> away > >> > > > from > >> > > > > > > > users)? > >> > > > > > > > > > While I understand the value of uniformity here, the > >> response > >> > > > > > format > >> > > > > > > > for > >> > > > > > > > > > sink connectors currently looks a little odd with the > >> > > > "partition" > >> > > > > > > field > >> > > > > > > > > > having "topic" and "partition" as sub-fields, > >> especially to > >> > > > users > >> > > > > > > > > familiar > >> > > > > > > > > > with Kafka semantics. Thoughts? > >> > > > > > > > > > > >> > > > > > > > > > 3. Another little nitpick on the response format - why > >> do we > >> > > > need > >> > > > > > > > > "source" > >> > > > > > > > > > / "sink" as a field under "offsets"? Users can query the > >> > > > connector > >> > > > > > > type > >> > > > > > > > > via > >> > > > > > > > > > the existing `GET /connectors` API. If it's deemed > >> important > >> > > > to let > >> > > > > > > > users > >> > > > > > > > > > know that the offsets they're seeing correspond to a > >> source / > >> > > > sink > >> > > > > > > > > > connector, maybe we could have a top level field "type" > >> in > >> > > the > >> > > > > > > response > >> > > > > > > > > for > >> > > > > > > > > > the `GET /connectors/{connector}/offsets` API similar > >> to the > >> > > > `GET > >> > > > > > > > > > /connectors` API? > >> > > > > > > > > > > >> > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` > >> API, the > >> > > > KIP > >> > > > > > > > mentions > >> > > > > > > > > > that requests will be rejected if a rebalance is > >> pending - > >> > > > > > presumably > >> > > > > > > > > this > >> > > > > > > > > > is to avoid forwarding requests to a leader which may no > >> > > > longer be > >> > > > > > > the > >> > > > > > > > > > leader after the pending rebalance? In this case, the > >> API > >> > > will > >> > > > > > > return a > >> > > > > > > > > > `409 Conflict` response similar to some of the existing > >> APIs, > >> > > > > > right? > >> > > > > > > > > > > >> > > > > > > > > > 5. Regarding fencing out previously running tasks for a > >> > > > connector, > >> > > > > > do > >> > > > > > > > you > >> > > > > > > > > > think it would make more sense semantically to have this > >> > > > > > implemented > >> > > > > > > in > >> > > > > > > > > the > >> > > > > > > > > > stop endpoint where an empty set of tasks is generated, > >> > > rather > >> > > > than > >> > > > > > > the > >> > > > > > > > > > delete offsets endpoint? This would also give the new > >> > > `STOPPED` > >> > > > > > > state a > >> > > > > > > > > > higher confidence of sorts, with any zombie tasks being > >> > > fenced > >> > > > off > >> > > > > > > from > >> > > > > > > > > > continuing to produce data. > >> > > > > > > > > > > >> > > > > > > > > > 6. Thanks for outlining the issues with the current > >> state of > >> > > > the > >> > > > > > > > `PAUSED` > >> > > > > > > > > > state - I think a lot of users expect it to behave like > >> the > >> > > > > > `STOPPED` > >> > > > > > > > > state > >> > > > > > > > > > you outline in the KIP and are (unpleasantly) surprised > >> when > >> > > it > >> > > > > > > > doesn't. > >> > > > > > > > > > However, this does beg the question of what the > >> usefulness of > >> > > > > > having > >> > > > > > > > two > >> > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we want to > >> > > > continue > >> > > > > > > > > > supporting both these states in the future, or do you > >> see the > >> > > > > > > `STOPPED` > >> > > > > > > > > > state eventually causing the existing `PAUSED` state to > >> be > >> > > > > > > deprecated? > >> > > > > > > > > > > >> > > > > > > > > > 7. I think the idea outlined in the KIP for handling a > >> new > >> > > > state > >> > > > > > > during > >> > > > > > > > > > cluster downgrades / rolling upgrades is quite clever, > >> but do > >> > > > you > >> > > > > > > think > >> > > > > > > > > > there could be any issues with having a mix of "paused" > >> and > >> > > > > > "stopped" > >> > > > > > > > > tasks > >> > > > > > > > > > for the same connector across workers in a cluster? At > >> the > >> > > very > >> > > > > > > least, > >> > > > > > > > I > >> > > > > > > > > > think it would be fairly confusing to most users. I'm > >> > > > wondering if > >> > > > > > > this > >> > > > > > > > > can > >> > > > > > > > > > be avoided by stating clearly in the KIP that the new > >> `PUT > >> > > > > > > > > > /connectors/{connector}/stop` > >> > > > > > > > > > can only be used on a cluster that is fully upgraded to > >> an AK > >> > > > > > version > >> > > > > > > > > newer > >> > > > > > > > > > than the one which ends up containing changes from this > >> KIP > >> > > and > >> > > > > > that > >> > > > > > > > if a > >> > > > > > > > > > cluster needs to be downgraded to an older version, the > >> user > >> > > > should > >> > > > > > > > > ensure > >> > > > > > > > > > that none of the connectors on the cluster are in a > >> stopped > >> > > > state? > >> > > > > > > With > >> > > > > > > > > the > >> > > > > > > > > > existing implementation, it looks like an > >> unknown/invalid > >> > > > target > >> > > > > > > state > >> > > > > > > > > > record is basically just discarded (with an error > >> message > >> > > > logged), > >> > > > > > so > >> > > > > > > > it > >> > > > > > > > > > doesn't seem to be a disastrous failure scenario that > >> can > >> > > bring > >> > > > > > down > >> > > > > > > a > >> > > > > > > > > > worker. > >> > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > Thanks, > >> > > > > > > > > > Yash > >> > > > > > > > > > > >> > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton > >> > > > > > > <chr...@aiven.io.invalid > >> > > > > > > > > > >> > > > > > > > > > wrote: > >> > > > > > > > > > > >> > > > > > > > > > > Hi Ashwin, > >> > > > > > > > > > > > >> > > > > > > > > > > Thanks for your thoughts. Regarding your questions: > >> > > > > > > > > > > > >> > > > > > > > > > > 1. The response would show the offsets that are > >> visible to > >> > > > the > >> > > > > > > source > >> > > > > > > > > > > connector, so it would combine the contents of the two > >> > > > topics, > >> > > > > > > giving > >> > > > > > > > > > > priority to offsets present in the connector-specific > >> > > topic. > >> > > > I'm > >> > > > > > > > > > imagining > >> > > > > > > > > > > a follow-up question that some people may have in > >> response > >> > > to > >> > > > > > that > >> > > > > > > is > >> > > > > > > > > > > whether we'd want to provide insight into the > >> contents of a > >> > > > > > single > >> > > > > > > > > topic > >> > > > > > > > > > at > >> > > > > > > > > > > a time. It may be useful to be able to see this > >> information > >> > > > in > >> > > > > > > order > >> > > > > > > > to > >> > > > > > > > > > > debug connector issues or verify that it's safe to > >> stop > >> > > > using a > >> > > > > > > > > > > connector-specific offsets topic (either explicitly, > >> or > >> > > > > > implicitly > >> > > > > > > > via > >> > > > > > > > > > > cluster downgrade). What do you think about adding a > >> URL > >> > > > query > >> > > > > > > > > parameter > >> > > > > > > > > > > that allows users to dictate which view of the > >> connector's > >> > > > > > offsets > >> > > > > > > > they > >> > > > > > > > > > are > >> > > > > > > > > > > given in the REST response, with options for the > >> worker's > >> > > > global > >> > > > > > > > topic, > >> > > > > > > > > > the > >> > > > > > > > > > > connector-specific topic, and the combined view of > >> them > >> > > that > >> > > > the > >> > > > > > > > > > connector > >> > > > > > > > > > > and its tasks see (which would be the default)? This > >> may be > >> > > > too > >> > > > > > > much > >> > > > > > > > > for > >> > > > > > > > > > V1 > >> > > > > > > > > > > but it feels like it's at least worth exploring a bit. > >> > > > > > > > > > > > >> > > > > > > > > > > 2. There is no option for this at the moment. Reset > >> > > > semantics are > >> > > > > > > > > > extremely > >> > > > > > > > > > > coarse-grained; for source connectors, we delete all > >> source > >> > > > > > > offsets, > >> > > > > > > > > and > >> > > > > > > > > > > for sink connectors, we delete the entire consumer > >> group. > >> > > I'm > >> > > > > > > hoping > >> > > > > > > > > this > >> > > > > > > > > > > will be enough for V1 and that, if there's sufficient > >> > > demand > >> > > > for > >> > > > > > > it, > >> > > > > > > > we > >> > > > > > > > > > can > >> > > > > > > > > > > introduce a richer API for resetting or even modifying > >> > > > connector > >> > > > > > > > > offsets > >> > > > > > > > > > in > >> > > > > > > > > > > a follow-up KIP. > >> > > > > > > > > > > > >> > > > > > > > > > > 3. Good eye :) I think it's fine to keep the existing > >> > > > behavior > >> > > > > > for > >> > > > > > > > the > >> > > > > > > > > > > PAUSED state with the Connector instance, since the > >> primary > >> > > > > > purpose > >> > > > > > > > of > >> > > > > > > > > > the > >> > > > > > > > > > > Connector is to generate task configs and monitor the > >> > > > external > >> > > > > > > system > >> > > > > > > > > for > >> > > > > > > > > > > changes. If there's no chance for tasks to be running > >> > > > anyways, I > >> > > > > > > > don't > >> > > > > > > > > > see > >> > > > > > > > > > > much value in allowing paused connectors to generate > >> new > >> > > task > >> > > > > > > > configs, > >> > > > > > > > > > > especially since each time that happens a rebalance is > >> > > > triggered > >> > > > > > > and > >> > > > > > > > > > > there's a non-zero cost to that. What do you think? > >> > > > > > > > > > > > >> > > > > > > > > > > Cheers, > >> > > > > > > > > > > > >> > > > > > > > > > > Chris > >> > > > > > > > > > > > >> > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin > >> > > > > > > <apan...@confluent.io.invalid > >> > > > > > > > > > >> > > > > > > > > > > wrote: > >> > > > > > > > > > > > >> > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful > >> feature. > >> > > > > > > > > > > > > >> > > > > > > > > > > > Can you please elaborate on the following in the > >> KIP - > >> > > > > > > > > > > > > >> > > > > > > > > > > > 1. How would the response of GET > >> > > > > > /connectors/{connector}/offsets > >> > > > > > > > look > >> > > > > > > > > > > like > >> > > > > > > > > > > > if the worker has both global and connector specific > >> > > > offsets > >> > > > > > > topic > >> > > > > > > > ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > 2. How can we pass the reset options like shift-by , > >> > > > > > to-date-time > >> > > > > > > > > etc. > >> > > > > > > > > > > > using a REST API like DELETE > >> > > > /connectors/{connector}/offsets ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes its > >> stop > >> > > > > > method - > >> > > > > > > > > will > >> > > > > > > > > > > > there be a change here to reduce confusion with the > >> new > >> > > > > > proposed > >> > > > > > > > > > STOPPED > >> > > > > > > > > > > > state ? > >> > > > > > > > > > > > > >> > > > > > > > > > > > Thanks, > >> > > > > > > > > > > > Ashwin > >> > > > > > > > > > > > > >> > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton > >> > > > > > > > > <chr...@aiven.io.invalid > >> > > > > > > > > > > > >> > > > > > > > > > > > wrote: > >> > > > > > > > > > > > > >> > > > > > > > > > > > > Hi all, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > I noticed a fairly large gap in the first version > >> of > >> > > > this KIP > >> > > > > > > > that > >> > > > > > > > > I > >> > > > > > > > > > > > > published last Friday, which has to do with > >> > > accommodating > >> > > > > > > > > connectors > >> > > > > > > > > > > > > that target different Kafka clusters than the one > >> that > >> > > > the > >> > > > > > > Kafka > >> > > > > > > > > > > Connect > >> > > > > > > > > > > > > cluster uses for its internal topics and source > >> > > > connectors > >> > > > > > with > >> > > > > > > > > > > dedicated > >> > > > > > > > > > > > > offsets topics. I've since updated the KIP to > >> address > >> > > > this > >> > > > > > gap, > >> > > > > > > > > which > >> > > > > > > > > > > has > >> > > > > > > > > > > > > substantially altered the design. Wanted to give a > >> > > > heads-up > >> > > > > > to > >> > > > > > > > > anyone > >> > > > > > > > > > > > > that's already started reviewing. > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > Chris > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton < > >> > > > > > chr...@aiven.io> > >> > > > > > > > > > wrote: > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi all, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > I'd like to begin discussion on a KIP to add > >> offsets > >> > > > > > support > >> > > > > > > to > >> > > > > > > > > the > >> > > > > > > > > > > > Kafka > >> > > > > > > > > > > > > > Connect REST API: > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Cheers, > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Chris > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > >> > > > >> > >