Hey All, Please let me know how the KIP looks now. Is it at a stage where I can start with the Voting phase? Of course I am still open to feedback/suggestions but planning to start the Vote for it.
Thanks! Sagar. On Tue, Jul 11, 2023 at 10:00 PM Sagar <sagarmeansoc...@gmail.com> wrote: > Hi Yash/Chris, > > Thanks for the feedback! I have updated the KIP with the suggestions > provided. I would also update the PR with the suggestions. > > Also, I was hoping that this could make it to the 3.6 release given that > it would benefit source connectors which have some of the problems listed > in the Motivation Section. > > Responses Inline: > > Yash: > > 1) In the proposed changes section where you talk about modifying the >> offsets, could you please clarify that tasks shouldn't modify the offsets >> map that is passed as an argument? Currently, the distinction between the >> offsets map passed as an argument and the offsets map that is returned is >> not very clear in numerous places. > > > > Added > > 2) The default return value of Optional.empty() seems to be fairly >> non-intuitive considering that the return value is supposed to be the >> offsets that are to be committed. Can we consider simply returning the >> offsets argument itself by default instead? > > > > Chris is suggesting returning null for the default case. I am thinking to > make null > as the default return type. If the returned map is null, there won't be > any further > processing otherwise we will contonue with the existing logic. > > 3) The KIP states that "It is also possible that a task might choose to >> send a tombstone record as an offset. This is not recommended and to >> prevent connectors shooting themselves in the foot due to this" - could >> you >> please clarify why this is not recommended / supported? > > > > I have added that a better way of doing that would be via KIP-875. Also, I > didn't want to include > any mechamisms for users to meddle with the offsets topic. Allowing > tombstone records via this method > would be akin to publishing tombstone records directly to the offsets > topic which is not recommended > generally. > > 4) The KIP states that "If a task returns an Optional of a null object or >> an Optional of an empty map, even for such cases the behaviour would would >> be disabled." - since this is an optional API that source task >> implementations don't necessarily need to implement, I don't think I fully >> follow why the return type of the proposed "updateOffsets" method is an >> Optional? Can we not simply use the Map as the return type instead? > > > > Yeah, I updated the return type to be a Map. > > > 5) The KIP states that "The offsets passed to the updateOffsets method >> would be the offset from the latest source record amongst all source >> records per partition. This way, if the source offset for a given source >> partition is updated, that offset is the one that gets committed for the >> source partition." - we should clarify that the "latest" offset refers to >> the offsets that are about to be committed, and not the latest offsets >> returned from SourceTask::poll so far (see related discussion in >> https://issues.apache.org/jira/browse/KAFKA-15091 and >> https://issues.apache.org/jira/browse/KAFKA-5716). > > > > Done > > > 6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in >> Connect since the framework itself does not (and cannot) make any >> guarantees on the delivery semantics. Depending on the source connector >> and >> the source system, both at-least once and at-most once semantics (for >> example - a source system where reads are destructive) are possible. We >> should avoid introducing this terminology in the KIP and instead refer to >> this scenario as exactly-once support being disabled. > > > > Done > > > 7) Similar to the above point, we should remove the use of the term >> "Exactly Once Semantics" and instead refer to exactly-once support being >> enabled since the framework can't guarantee exactly-once semantics for all >> possible source connectors (for example - a message queue source connector >> where offsets are essentially managed in the source system via an ack >> mechanism). > > > Done > > 8) In a previous attempt to fix this gap in functionality, a significant >> concern was raised on offsets ordering guarantees when we retry sending a >> batch of records (ref - >> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't >> look like this KIP addresses that concern either? In the case where >> exactly-once support is disabled - if we update the committableOffsets >> with >> the offsets provided by the task through the new updateOffsets method, >> these offsets could be committed before older "regular" offsets are >> committed due to producer retries which could then lead to an >> inconsistency >> if the send operation eventually succeeds. > > > > > Thanks for bringing this up. I went through the comment shared above. If > you see the implementation > that I have in the PR, in EOS-disabled case, updateOffsets is invoked only > when toSend is null. Refer > here: > https://github.com/apache/kafka/pull/13899/files#diff-a3107b56382b6ec950dc9d19d21f188c21d4bf41853e0505d60d3bf87adab6a9R324-R330 > > > Which means that we invoke updateOffsets only when > 1) Either the last poll invocation didn't return any records or > 2) All the records returned by the previous poll invocation got processed > successfully > 3) First iteration of task because toSend would be null initially. > > > IIUC the concern expressed in the link shared by you and the solution > proposed there, it seems that's what is being proposed > > > What if your new block of code were only performed if sendRecords() >> succeeded > > > > Even for this there are concerns expressed but those don't seem to be > related to offsets ordering guarantees. WDYT? > > > 9) The KIP states that when exactly-once support is enabled, the new >> SourceTask::updateOffsets method will be invoked only when an offset flush >> is attempted. If the connector is configured to use a connector specified >> transaction boundary rather than a poll or interval based boundary, isn't >> it possible that we don't call SourceTask::updateOffsets until there are >> actual records that are also being returned through poll (which would >> defeat the primary motivation of the KIP)? Or are we making the assumption >> that the connector defined transaction boundary should handle this case >> appropriately if needed (i.e. source tasks should occasionally request for >> a transaction commit via their transaction context if they want offsets to >> be committed without producing records)? If so, I think we should >> explicitly call that out in the KIP. > > > > That's a great point. I didn't consider this case. I have updated the KIP. > > 10) The Javadoc for SourceTask::updateOffsets in the section on public >> interfaces also has the same issue with the definition of latest offsets >> that I've mentioned above (latest offsets from poll versus latest offsets >> that are about to be committed). > > > Done > > 11) The Javadoc for SourceTask::updateOffsets also introduces the same >> confusion w.r.t updating offsets that I've mentioned above (modifying the >> offsets map argument versus returning a modified copy of the offsets map). > > > > I have modified the verbiage and even the meaning of the return type as > suggested by Chris. > > 12) In the section on compatibility, we should explicitly mention that >> connectors which implement the new method will still be compatible with >> older Connect runtimes where the method will simply not be invoked. > > > Done > > > Chris: > > 1. (Nit) Can we move the "Public Interfaces" section before the "Proposed >> Changes" section? It's nice to have a summary of the user/developer-facing >> changes first since that answers many of the questions that I had while >> reading the "Proposed Changes" section. I'd bet that this is also why we >> use that ordering in the KIP template. > > > > Done > > 2. Why are we invoking SourceTask::updateOffsets so frequently when >> exactly-once support is disabled? Wouldn't it be simpler both for our >> implementation and for connector developers if we only invoked it directly >> before committing offsets, instead of potentially several times between >> offset commits, especially since that would also mirror the behavior with >> exactly-once support enabled? > > > > Hmm the idea was to keep the changes bounded within the SourceTask loop. > Since the EOS-disabled case > uses a separate thread to commit offsets, I thought it's easier to have > the updateOffsets invoked in > the same loop and have it update the committableOffsets. The committer > thread will keep doing what it > does today. I felt this is easier to reason about. WDYT? > > > 3. Building off of point 2, we wouldn't need to specify any more detail >> than that "SourceTask::updateOffsets will be invoked directly before >> committing offsets, with the to-be-committed offsets". There would be no >> need to distinguish between when exactly-once support is enabled or >> disabled. > > > > Yeah I have added the fact that updateOffsets would be invoked before > committing offsets with about to be committed offsets. > I have still left the EOS enabled/disabled intact because there are > differences that I wanted to highlight like honouring > Transaction boundaries and another edge case with Connector transaction > boundary mode that Yash had brought up. > > > 4. Some general stylistic feedback: we shouldn't mention the names of >> internal classes or methods in KIPs. KIPS are for discussing high-level >> design proposals. Internal names and APIS may change over time, and are >> not >> very helpful to readers who are not already familiar with the code base. >> Instead, we should describe changes in behavior, not code. > > > > Yeah I generally avoid dwelling into the details but in this case I felt I > need to explain a bit more why > I am proposing what I am proposing. I have made the edits. > > 5. Why return a complete map of to-be-committed offsets instead of a map of >> just the offsets that the connector wants to change? This seems especially >> intuitive since we automatically re-insert source partitions that have >> been >> removed by the connector. > > > > Makes sense. I updated the KIP accordingly. > > 6. I don't think we don't need to return an Optional from >> SourceTask::updateOffsets. Developers can return null instead of >> Optional.empty(), and since the framework will have to handle null return >> values either way, this would reduce the number of cases for us to handle >> from three (Optional.of(...), Optional.empty(), null) to two (null, >> non-null). > > > > I see. I didn't want to have explicit null checks but then I realised > connect does have explicit null > checks. Edited. > > > 7. Why disallow tombstone records? If an upstream resource disappears, then >> wouldn't a task want to emit a tombstone record without having to also >> emit >> an accompanying source record? This could help prevent an >> infinitely-growing offsets topic, although with KIP-875 coming out in the >> next release, perhaps we can leave this out for now and let Connect users >> and cluster administrators do this work manually instead of letting >> connector developers automate it. > > > > Even before I considered KIP-875's effects, my thought was to not meddle > too much with the inner > workings of the offsets topic. I think even today users can produce an > offset record to the offsets > topic to drop an unwanted partition but that should be used as a last > resort. I didn't want to introduce > any such mechanisms via this proposal. And with KIP-875 coming in, it > makes all the more sense to not do > it and have the offsets deleted in a more standardised way. The last part > about KIP-875 is what I have mentioned > in the KIP. > > > 8. Is the information on multiple offsets topics for exactly-once >> connectors relevant to this KIP? If not, we should remove it. > > > Removed. > > > 9. It seems like most of the use cases that motivate this KIP only require >> being able to add a new source partition/source offset pair to the >> to-be-committed offsets. Do we need to allow connector developers to >> modify >> source offsets for already-present source partitions at all? If we reduce >> the surface of the API, then the worst case is still just that the offsets >> we commit are at most one commit out-of-date. > > > It could be useful in a scenario where the offset of a partition doesn't > update for some period of time. In > such cases, the connector can do some kind of state tracking and update > the offsets after the time period elapses. > > I had mentioned an example of this scenario in an earlier e-mail: > > > There's also a case at times with CDC source connectors which are REST Api >> / Web Service based(Zendesk Source Connector for example) . These >> connectors typically use timestamps from the responses as offsets. If >> there's a long period of inactivity wherein the API invocations don't >> return any data, then the offsets won't move and the connector would keep >> using the same timestamp that it received from the last non-empty response. >> If this period of inactivity keeps growing, and the API imposes any limits >> on how far back we can go in terms of window start, then this could >> potentially be a problem. In this case even though the connector was caught >> up with all the responses, it may need to snapshot again. In this case >> updating offsets can easily help since all the connector needs to do is to >> move the timestamp which would move the offset inherently. > > > > > 10. (Nit) The "Motivation" section states that "offsets are written >> periodically by the connect framework to an offsets topic". This is only >> true in distributed mode; in standalone mode, we write offsets to a local >> file. > > > > Ack. > > On Wed, Jul 5, 2023 at 8:47 PM Chris Egerton <chr...@aiven.io.invalid> > wrote: > >> Hi Sagar, >> >> Thanks for updating the KIP! The latest draft seems simpler and more >> focused, which I think is a win for users and developers alike. Here are >> my >> thoughts on the current draft: >> >> 1. (Nit) Can we move the "Public Interfaces" section before the "Proposed >> Changes" section? It's nice to have a summary of the user/developer-facing >> changes first since that answers many of the questions that I had while >> reading the "Proposed Changes" section. I'd bet that this is also why we >> use that ordering in the KIP template. >> >> 2. Why are we invoking SourceTask::updateOffsets so frequently when >> exactly-once support is disabled? Wouldn't it be simpler both for our >> implementation and for connector developers if we only invoked it directly >> before committing offsets, instead of potentially several times between >> offset commits, especially since that would also mirror the behavior with >> exactly-once support enabled? >> >> 3. Building off of point 2, we wouldn't need to specify any more detail >> than that "SourceTask::updateOffsets will be invoked directly before >> committing offsets, with the to-be-committed offsets". There would be no >> need to distinguish between when exactly-once support is enabled or >> disabled. >> >> 4. Some general stylistic feedback: we shouldn't mention the names of >> internal classes or methods in KIPs. KIPS are for discussing high-level >> design proposals. Internal names and APIS may change over time, and are >> not >> very helpful to readers who are not already familiar with the code base. >> Instead, we should describe changes in behavior, not code. >> >> 5. Why return a complete map of to-be-committed offsets instead of a map >> of >> just the offsets that the connector wants to change? This seems especially >> intuitive since we automatically re-insert source partitions that have >> been >> removed by the connector. >> >> 6. I don't think we don't need to return an Optional from >> SourceTask::updateOffsets. Developers can return null instead of >> Optional.empty(), and since the framework will have to handle null return >> values either way, this would reduce the number of cases for us to handle >> from three (Optional.of(...), Optional.empty(), null) to two (null, >> non-null). >> >> 7. Why disallow tombstone records? If an upstream resource disappears, >> then >> wouldn't a task want to emit a tombstone record without having to also >> emit >> an accompanying source record? This could help prevent an >> infinitely-growing offsets topic, although with KIP-875 coming out in the >> next release, perhaps we can leave this out for now and let Connect users >> and cluster administrators do this work manually instead of letting >> connector developers automate it. >> >> 8. Is the information on multiple offsets topics for exactly-once >> connectors relevant to this KIP? If not, we should remove it. >> >> 9. It seems like most of the use cases that motivate this KIP only require >> being able to add a new source partition/source offset pair to the >> to-be-committed offsets. Do we need to allow connector developers to >> modify >> source offsets for already-present source partitions at all? If we reduce >> the surface of the API, then the worst case is still just that the offsets >> we commit are at most one commit out-of-date. >> >> 10. (Nit) The "Motivation" section states that "offsets are written >> periodically by the connect framework to an offsets topic". This is only >> true in distributed mode; in standalone mode, we write offsets to a local >> file. >> >> Cheers, >> >> Chris >> >> On Tue, Jul 4, 2023 at 8:42 AM Yash Mayya <yash.ma...@gmail.com> wrote: >> >> > Hi Sagar, >> > >> > Thanks for your continued work on this KIP! Here are my thoughts on your >> > updated proposal: >> > >> > 1) In the proposed changes section where you talk about modifying the >> > offsets, could you please clarify that tasks shouldn't modify the >> offsets >> > map that is passed as an argument? Currently, the distinction between >> the >> > offsets map passed as an argument and the offsets map that is returned >> is >> > not very clear in numerous places. >> > >> > 2) The default return value of Optional.empty() seems to be fairly >> > non-intuitive considering that the return value is supposed to be the >> > offsets that are to be committed. Can we consider simply returning the >> > offsets argument itself by default instead? >> > >> > 3) The KIP states that "It is also possible that a task might choose to >> > send a tombstone record as an offset. This is not recommended and to >> > prevent connectors shooting themselves in the foot due to this" - could >> you >> > please clarify why this is not recommended / supported? >> > >> > 4) The KIP states that "If a task returns an Optional of a null object >> or >> > an Optional of an empty map, even for such cases the behaviour would >> would >> > be disabled." - since this is an optional API that source task >> > implementations don't necessarily need to implement, I don't think I >> fully >> > follow why the return type of the proposed "updateOffsets" method is an >> > Optional? Can we not simply use the Map as the return type instead? >> > >> > 5) The KIP states that "The offsets passed to the updateOffsets method >> > would be the offset from the latest source record amongst all source >> > records per partition. This way, if the source offset for a given source >> > partition is updated, that offset is the one that gets committed for the >> > source partition." - we should clarify that the "latest" offset refers >> to >> > the offsets that are about to be committed, and not the latest offsets >> > returned from SourceTask::poll so far (see related discussion in >> > https://issues.apache.org/jira/browse/KAFKA-15091 and >> > https://issues.apache.org/jira/browse/KAFKA-5716). >> > >> > 6) We haven't used the terminology of "Atleast Once Semantics" >> elsewhere in >> > Connect since the framework itself does not (and cannot) make any >> > guarantees on the delivery semantics. Depending on the source connector >> and >> > the source system, both at-least once and at-most once semantics (for >> > example - a source system where reads are destructive) are possible. We >> > should avoid introducing this terminology in the KIP and instead refer >> to >> > this scenario as exactly-once support being disabled. >> > >> > 7) Similar to the above point, we should remove the use of the term >> > "Exactly Once Semantics" and instead refer to exactly-once support being >> > enabled since the framework can't guarantee exactly-once semantics for >> all >> > possible source connectors (for example - a message queue source >> connector >> > where offsets are essentially managed in the source system via an ack >> > mechanism). >> > >> > 8) In a previous attempt to fix this gap in functionality, a significant >> > concern was raised on offsets ordering guarantees when we retry sending >> a >> > batch of records (ref - >> > https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't >> > look like this KIP addresses that concern either? In the case where >> > exactly-once support is disabled - if we update the committableOffsets >> with >> > the offsets provided by the task through the new updateOffsets method, >> > these offsets could be committed before older "regular" offsets are >> > committed due to producer retries which could then lead to an >> inconsistency >> > if the send operation eventually succeeds. >> > >> > 9) The KIP states that when exactly-once support is enabled, the new >> > SourceTask::updateOffsets method will be invoked only when an offset >> flush >> > is attempted. If the connector is configured to use a connector >> specified >> > transaction boundary rather than a poll or interval based boundary, >> isn't >> > it possible that we don't call SourceTask::updateOffsets until there are >> > actual records that are also being returned through poll (which would >> > defeat the primary motivation of the KIP)? Or are we making the >> assumption >> > that the connector defined transaction boundary should handle this case >> > appropriately if needed (i.e. source tasks should occasionally request >> for >> > a transaction commit via their transaction context if they want offsets >> to >> > be committed without producing records)? If so, I think we should >> > explicitly call that out in the KIP. >> > >> > 10) The Javadoc for SourceTask::updateOffsets in the section on public >> > interfaces also has the same issue with the definition of latest offsets >> > that I've mentioned above (latest offsets from poll versus latest >> offsets >> > that are about to be committed). >> > >> > 11) The Javadoc for SourceTask::updateOffsets also introduces the same >> > confusion w.r.t updating offsets that I've mentioned above (modifying >> the >> > offsets map argument versus returning a modified copy of the offsets >> map). >> > >> > 12) In the section on compatibility, we should explicitly mention that >> > connectors which implement the new method will still be compatible with >> > older Connect runtimes where the method will simply not be invoked. >> > >> > >> > Thanks, >> > Yash >> > >> > On Wed, Jun 21, 2023 at 10:25 PM Sagar <sagarmeansoc...@gmail.com> >> wrote: >> > >> > > Hi All, >> > > >> > > I have created this PR: https://github.com/apache/kafka/pull/13899 >> which >> > > implements the approach outlined in the latest version of the KIP. I >> > > thought I could use this to validate the approach based on my >> > understanding >> > > while the KIP itself gets reviewed. I can always change the >> > implementation >> > > once we move to a final decision on the KIP. >> > > >> > > Thanks! >> > > Sagar. >> > > >> > > >> > > On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com> >> wrote: >> > > >> > > > Hey All, >> > > > >> > > > Bumping this discussion thread again to see how the modified KIP >> looks >> > > > like. >> > > > >> > > > Thanks! >> > > > Sagar. >> > > > >> > > > On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com> >> > wrote: >> > > > >> > > >> Hi, >> > > >> >> > > >> Bumping this thread again for further reviews. >> > > >> >> > > >> Thanks! >> > > >> Sagar. >> > > >> >> > > >> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com> >> > > wrote: >> > > >> >> > > >>> Hi All, >> > > >>> >> > > >>> Thanks for the comments/reviews. I have updated the KIP >> > > >>> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >> > > >>> with a newer approach which shelves the need for an explicit >> topic. >> > > >>> >> > > >>> Please review again and let me know what you think. >> > > >>> >> > > >>> Thanks! >> > > >>> Sagar. >> > > >>> >> > > >>> >> > > >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com> >> > > wrote: >> > > >>> >> > > >>>> Hi Sagar, >> > > >>>> >> > > >>>> Thanks for the KIP! I have a few questions and comments: >> > > >>>> >> > > >>>> 1) I agree with Chris' point about the separation of a connector >> > > >>>> heartbeat >> > > >>>> mechanism and allowing source connectors to generate offsets >> without >> > > >>>> producing data. What is the purpose of the heartbeat topic here >> and >> > > are >> > > >>>> there any concrete use cases for downstream consumers on this >> topic? >> > > Why >> > > >>>> can't we instead simply introduce a mechanism to retrieve a list >> of >> > > >>>> source >> > > >>>> partition / source offset pairs from the source tasks? >> > > >>>> >> > > >>>> 2) With the currently described mechanism, the new >> > > >>>> "SourceTask::produceHeartbeatRecords" method returns a >> > > >>>> "List<SourceRecord>" >> > > >>>> - what happens with the topic in each of these source records? >> Chris >> > > >>>> pointed this out above, but it doesn't seem to have been >> addressed? >> > > The >> > > >>>> "SourceRecord" class also has a bunch of other fields which will >> be >> > > >>>> irrelevant here (partition, key / value schema, key / value data, >> > > >>>> timestamp, headers). In fact, it seems like only the source >> > partition >> > > >>>> and >> > > >>>> source offset are relevant here, so we should either introduce a >> new >> > > >>>> abstraction or simply use a data structure like a mapping from >> > source >> > > >>>> partitions to source offsets (adds to the above point)? >> > > >>>> >> > > >>>> 3) I'm not sure I fully follow why the heartbeat timer / >> interval is >> > > >>>> needed? What are the downsides of >> > > >>>> calling "SourceTask::produceHeartbeatRecords" in every execution >> > loop >> > > >>>> (similar to the existing "SourceTask::poll" method)? Is this >> only to >> > > >>>> prevent the generation of a lot of offset records? Since >> Connect's >> > > >>>> offsets >> > > >>>> topics are log compacted (and source partitions are used as keys >> for >> > > >>>> each >> > > >>>> source offset), I'm not sure if such concerns are valid and such >> a >> > > >>>> heartbeat timer / interval mechanism is required? >> > > >>>> >> > > >>>> 4) The first couple of rejected alternatives state that the use >> of a >> > > >>>> null >> > > >>>> topic / key / value are preferably avoided - but the current >> > proposal >> > > >>>> would >> > > >>>> also likely require connectors to use such workarounds (null >> topic >> > > when >> > > >>>> the >> > > >>>> heartbeat topic is configured at a worker level and always for >> the >> > > key / >> > > >>>> value)? >> > > >>>> >> > > >>>> 5) The third rejected alternative talks about subclassing the >> > > >>>> "SourceRecord" class - this presumably means allowing connectors >> to >> > > pass >> > > >>>> special offset only records via the existing poll mechanism? Why >> was >> > > >>>> this >> > > >>>> considered a more invasive option? Was it because of the backward >> > > >>>> compatibility issues that would be introduced for plugins using >> the >> > > new >> > > >>>> public API class that still need to be deployed onto older >> Connect >> > > >>>> workers? >> > > >>>> >> > > >>>> Thanks, >> > > >>>> Yash >> > > >>>> >> > > >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com >> > >> > > >>>> wrote: >> > > >>>> >> > > >>>> > One thing I forgot to mention in my previous email was that the >> > > >>>> reason I >> > > >>>> > chose to include the opt-in behaviour via configs was that the >> > users >> > > >>>> of the >> > > >>>> > connector know their workload patterns. If the workload is such >> > that >> > > >>>> the >> > > >>>> > connector would receive regular valid updates then there’s >> > ideally >> > > >>>> no need >> > > >>>> > for moving offsets since it would update automatically. >> > > >>>> > >> > > >>>> > This way they aren’t forced to use this feature and can use it >> > only >> > > >>>> when >> > > >>>> > the workload is expected to be batchy or not frequent. >> > > >>>> > >> > > >>>> > Thanks! >> > > >>>> > Sagar. >> > > >>>> > >> > > >>>> > >> > > >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar < >> sagarmeansoc...@gmail.com> >> > > >>>> wrote: >> > > >>>> > >> > > >>>> > > Hi Chris, >> > > >>>> > > >> > > >>>> > > Thanks for following up on the response. Sharing my thoughts >> > > >>>> further: >> > > >>>> > > >> > > >>>> > > If we want to add support for connectors to emit offsets >> without >> > > >>>> > >> accompanying source records, we could (and IMO should) do >> that >> > > >>>> without >> > > >>>> > >> requiring users to manually enable that feature by adjusting >> > > >>>> worker or >> > > >>>> > >> connector configurations. >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > With the current KIP design, I have tried to implement this >> in >> > an >> > > >>>> opt-in >> > > >>>> > > manner via configs. I guess what you are trying to say is >> that >> > > this >> > > >>>> > doesn't >> > > >>>> > > need a config of it's own and instead could be part of the >> poll >> > -> >> > > >>>> > > transform etc -> produce -> commit cycle. That way, the users >> > > don't >> > > >>>> need >> > > >>>> > to >> > > >>>> > > set any config and if the connector supports moving offsets >> w/o >> > > >>>> producing >> > > >>>> > > SourceRecords, it should happen automatically. Is that >> correct? >> > If >> > > >>>> that >> > > >>>> > > is the concern, then I can think of not exposing a config and >> > try >> > > >>>> to make >> > > >>>> > > this process automatically. That should ease the load on >> > connector >> > > >>>> users, >> > > >>>> > > but your point about cognitive load on Connector developers, >> I >> > am >> > > >>>> still >> > > >>>> > not >> > > >>>> > > sure how to address that. The offsets are privy to a >> connector >> > and >> > > >>>> the >> > > >>>> > > framework at best can provide hooks to the tasks to update >> their >> > > >>>> offsets. >> > > >>>> > > Connector developers would still have to consider all cases >> > before >> > > >>>> > updating >> > > >>>> > > offsets. And if I ignore the heartbeat topic and heartbeat >> > > >>>> interval ms >> > > >>>> > > configs, then what the KIP proposes currently isn't much >> > different >> > > >>>> in >> > > >>>> > that >> > > >>>> > > regard. Just that it produces a List of SourceRecord which >> can >> > be >> > > >>>> changed >> > > >>>> > > to a Map of SourcePartition and their offsets if you think >> that >> > > >>>> would >> > > >>>> > > simplify things. Are there other cases in your mind which >> need >> > > >>>> > addressing? >> > > >>>> > > >> > > >>>> > > Here's my take on the usecases: >> > > >>>> > > >> > > >>>> > > 1. Regarding the example about SMTs with Object Storage >> based >> > > >>>> > > connectors, it was one of the scenarios identified. We >> have >> > > some >> > > >>>> > connectors >> > > >>>> > > that rely on the offsets topic to check if the next batch >> of >> > > >>>> files >> > > >>>> > should >> > > >>>> > > be processed and because of filtering of the last record >> from >> > > the >> > > >>>> > files, >> > > >>>> > > the eof supposedly is never reached and the connector >> can't >> > > >>>> commit >> > > >>>> > offsets >> > > >>>> > > for that source partition(file). If there was a mechanism >> to >> > > >>>> update >> > > >>>> > offsets >> > > >>>> > > for such a source file, then with some moderately complex >> > state >> > > >>>> > tracking, >> > > >>>> > > the connector can mark that file as processed and proceed. >> > > >>>> > > 2. There's another use case with the same class of >> connectors >> > > >>>> where if >> > > >>>> > > a file is malformed, then the connector couldn't produce >> any >> > > >>>> offsets >> > > >>>> > > because the file couldn't get processed completely. To >> handle >> > > >>>> such >> > > >>>> > cases, >> > > >>>> > > the connector developers have introduced a dev/null sort >> of >> > > topic >> > > >>>> > where >> > > >>>> > > they produce a record to this corrupted file topic and >> move >> > the >> > > >>>> offset >> > > >>>> > > somehow. This topic ideally isn't needed and with a >> mechanism >> > > to >> > > >>>> > update >> > > >>>> > > offsets would have helped in this case as well. >> > > >>>> > > 3. Coming to CDC based connectors, >> > > >>>> > > 1. We had a similar issue with Oracle CDC source >> connector >> > > and >> > > >>>> > > needed to employ the same heartbeat mechanism to get >> > around >> > > >>>> it. >> > > >>>> > > 2. MongoDB CDC source Connector has employed the same >> > > >>>> heartbeat >> > > >>>> > > mechanism Check `heartbeat.interval.ms` here ( >> > > >>>> > > >> > > >>>> > >> > > >>>> >> > > >> > >> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/ >> > > >>>> > > ). >> > > >>>> > > 3. Another CDC connector for ScyllaDB employs a similar >> > > >>>> mechanism. >> > > >>>> > > >> > > >>>> > >> > > >>>> >> > > >> > >> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat >> > > >>>> > > 4. For CDC based connectors, you could argue that these >> > > >>>> connectors >> > > >>>> > > have been able to solve this error then why do we need >> > > >>>> framework >> > > >>>> > level >> > > >>>> > > support. But the point I am trying to make is that this >> > > >>>> limitation >> > > >>>> > from the >> > > >>>> > > framework is forcing CDC connector developers to >> implement >> > > >>>> > per-connector >> > > >>>> > > solutions/hacks(at times). And there could always be >> more >> > > CDC >> > > >>>> > connectors in >> > > >>>> > > the pipeline forcing them to take a similar route as >> well. >> > > >>>> > > 4. There's also a case at times with CDC source connectors >> > > which >> > > >>>> are >> > > >>>> > > REST Api / Web Service based(Zendesk Source Connector for >> > > >>>> example) . >> > > >>>> > These >> > > >>>> > > connectors typically use timestamps from the responses as >> > > >>>> offsets. If >> > > >>>> > > there's a long period of inactivity wherein the API >> > invocations >> > > >>>> don't >> > > >>>> > > return any data, then the offsets won't move and the >> > connector >> > > >>>> would >> > > >>>> > keep >> > > >>>> > > using the same timestamp that it received from the last >> > > non-empty >> > > >>>> > response. >> > > >>>> > > If this period of inactivity keeps growing, and the API >> > imposes >> > > >>>> any >> > > >>>> > limits >> > > >>>> > > on how far back we can go in terms of window start, then >> this >> > > >>>> could >> > > >>>> > > potentially be a problem. In this case even though the >> > > connector >> > > >>>> was >> > > >>>> > caught >> > > >>>> > > up with all the responses, it may need to snapshot again. >> In >> > > >>>> this case >> > > >>>> > > updating offsets can easily help since all the connector >> > needs >> > > >>>> to do >> > > >>>> > is to >> > > >>>> > > move the timestamp which would move the offset inherently. >> > > >>>> > > >> > > >>>> > > I still believe that this is something the framework should >> > > support >> > > >>>> OOB >> > > >>>> > > irrespective of whether the connectors have been able to get >> > > around >> > > >>>> this >> > > >>>> > > restriction or not. >> > > >>>> > > >> > > >>>> > > Lastly, about your comments here: >> > > >>>> > > >> > > >>>> > > I'm also not sure that it's worth preserving the current >> > behavior >> > > >>>> that >> > > >>>> > >> offsets for records that have been filtered out via SMT are >> not >> > > >>>> > committed. >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > Let me know if we need a separate JIRA to track this? This >> > somehow >> > > >>>> didn't >> > > >>>> > > look related to this discussion. >> > > >>>> > > >> > > >>>> > > Thanks! >> > > >>>> > > Sagar. >> > > >>>> > > >> > > >>>> > > >> > > >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton >> > > >>>> <chr...@aiven.io.invalid> >> > > >>>> > > wrote: >> > > >>>> > > >> > > >>>> > >> Hi Sagar, >> > > >>>> > >> >> > > >>>> > >> I'm sorry, I'm still not convinced that this design solves >> the >> > > >>>> > problem(s) >> > > >>>> > >> it sets out to solve in the best way possible. I tried to >> > > >>>> highlight this >> > > >>>> > >> in >> > > >>>> > >> my last email: >> > > >>>> > >> >> > > >>>> > >> > In general, it seems like we're trying to solve two >> > completely >> > > >>>> > different >> > > >>>> > >> problems with this single KIP: adding framework-level >> support >> > for >> > > >>>> > emitting >> > > >>>> > >> heartbeat records for source connectors, and allowing source >> > > >>>> connectors >> > > >>>> > to >> > > >>>> > >> emit offsets without also emitting source records. I don't >> mind >> > > >>>> > addressing >> > > >>>> > >> the two at the same time if the result is elegant and >> doesn't >> > > >>>> compromise >> > > >>>> > >> on >> > > >>>> > >> the solution for either problem, but that doesn't seem to be >> > the >> > > >>>> case >> > > >>>> > >> here. >> > > >>>> > >> Of the two problems, could we describe one as the primary >> and >> > one >> > > >>>> as the >> > > >>>> > >> secondary? If so, we might consider dropping the secondary >> > > problem >> > > >>>> from >> > > >>>> > >> this KIP and addressing it separately. >> > > >>>> > >> >> > > >>>> > >> If we wanted to add support for heartbeat records, we could >> > (and >> > > >>>> IMO >> > > >>>> > >> should) do that without requiring connectors to implement >> any >> > new >> > > >>>> > methods >> > > >>>> > >> and only require adjustments to worker or connector >> > > configurations >> > > >>>> by >> > > >>>> > >> users >> > > >>>> > >> in order to enable that feature. >> > > >>>> > >> >> > > >>>> > >> If we want to add support for connectors to emit offsets >> > without >> > > >>>> > >> accompanying source records, we could (and IMO should) do >> that >> > > >>>> without >> > > >>>> > >> requiring users to manually enable that feature by adjusting >> > > >>>> worker or >> > > >>>> > >> connector configurations. >> > > >>>> > >> >> > > >>>> > >> >> > > >>>> > >> I'm also not sure that it's worth preserving the current >> > behavior >> > > >>>> that >> > > >>>> > >> offsets for records that have been filtered out via SMT are >> not >> > > >>>> > committed. >> > > >>>> > >> I can't think of a case where this would be useful and there >> > are >> > > >>>> > obviously >> > > >>>> > >> plenty where it isn't. There's also a slight discrepancy in >> how >> > > >>>> these >> > > >>>> > >> kinds >> > > >>>> > >> of records are treated by the Connect runtime now; if a >> record >> > is >> > > >>>> > dropped >> > > >>>> > >> because of an SMT, then its offset isn't committed, but if >> it's >> > > >>>> dropped >> > > >>>> > >> because exactly-once support is enabled and the connector >> chose >> > > to >> > > >>>> abort >> > > >>>> > >> the batch containing the record, then its offset is still >> > > >>>> committed. >> > > >>>> > After >> > > >>>> > >> thinking carefully about the aborted transaction behavior, >> we >> > > >>>> realized >> > > >>>> > >> that >> > > >>>> > >> it was fine to commit the offsets for those records, and I >> > > believe >> > > >>>> that >> > > >>>> > >> the >> > > >>>> > >> same logic can be applied to any record that we're done >> trying >> > to >> > > >>>> send >> > > >>>> > to >> > > >>>> > >> Kafka (regardless of whether it was sent correctly, dropped >> due >> > > to >> > > >>>> > >> producer >> > > >>>> > >> error, filtered via SMT, etc.). >> > > >>>> > >> >> > > >>>> > >> I also find the file-based source connector example a little >> > > >>>> confusing. >> > > >>>> > >> What about that kind of connector causes the offset for the >> > last >> > > >>>> record >> > > >>>> > of >> > > >>>> > >> a file to be treated differently? Is there anything >> different >> > > about >> > > >>>> > >> filtering that record via SMT vs. dropping it altogether >> > because >> > > >>>> of an >> > > >>>> > >> asynchronous producer error with "errors.tolerance" set to >> > "all"? >> > > >>>> And >> > > >>>> > >> finally, how would such a connector use the design proposed >> > here? >> > > >>>> > >> >> > > >>>> > >> Finally, I don't disagree that if there are other legitimate >> > use >> > > >>>> cases >> > > >>>> > >> that >> > > >>>> > >> would be helped by addressing KAFKA-3821, we should try to >> > solve >> > > >>>> that >> > > >>>> > >> issue >> > > >>>> > >> in the Kafka Connect framework instead of requiring >> individual >> > > >>>> > connectors >> > > >>>> > >> to implement their own solutions. But the cognitive load >> added >> > by >> > > >>>> the >> > > >>>> > >> design proposed here, for connector developers and Connect >> > > cluster >> > > >>>> > >> administrators alike, costs too much to justify by pointing >> to >> > an >> > > >>>> > >> already-solved problem encountered by a single group of >> > > connectors >> > > >>>> > (i.e., >> > > >>>> > >> Debezium). This is why I think it's crucial that we identify >> > > >>>> realistic >> > > >>>> > >> cases where this feature would actually be useful, and right >> > > now, I >> > > >>>> > don't >> > > >>>> > >> think any have been provided (at least, not ones that have >> > > already >> > > >>>> been >> > > >>>> > >> addressed or could be addressed with much simpler changes). >> > > >>>> > >> >> > > >>>> > >> Cheers, >> > > >>>> > >> >> > > >>>> > >> Chris >> > > >>>> > >> >> > > >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar < >> > sagarmeansoc...@gmail.com >> > > > >> > > >>>> > wrote: >> > > >>>> > >> >> > > >>>> > >> > Hi Chris, >> > > >>>> > >> > >> > > >>>> > >> > Thanks for your detailed feedback! >> > > >>>> > >> > >> > > >>>> > >> > nits: I have taken care of them now. Thanks for pointing >> > those >> > > >>>> out. >> > > >>>> > >> > >> > > >>>> > >> > non-nits: >> > > >>>> > >> > >> > > >>>> > >> > 6) It seems (based on both the KIP and discussion on >> > > KAFKA-3821) >> > > >>>> that >> > > >>>> > >> the >> > > >>>> > >> > > only use case for being able to emit offsets without >> also >> > > >>>> emitting >> > > >>>> > >> source >> > > >>>> > >> > > records that's been identified so far is for CDC source >> > > >>>> connectors >> > > >>>> > >> like >> > > >>>> > >> > > Debezium. >> > > >>>> > >> > >> > > >>>> > >> > >> > > >>>> > >> > I am aware of atleast one more case where the non >> production >> > of >> > > >>>> > offsets >> > > >>>> > >> > (due to non production of records ) leads to the failure >> of >> > > >>>> connectors >> > > >>>> > >> when >> > > >>>> > >> > the source purges the records of interest. This happens in >> > File >> > > >>>> based >> > > >>>> > >> > source connectors (like s3/blob storage ) in which if the >> > last >> > > >>>> record >> > > >>>> > >> from >> > > >>>> > >> > a file is fiterterd due to an SMT, then that particular >> file >> > is >> > > >>>> never >> > > >>>> > >> > committed to the source partition and eventually when the >> > file >> > > is >> > > >>>> > >> deleted >> > > >>>> > >> > from the source and the connector is restarted due to some >> > > >>>> reason, it >> > > >>>> > >> > fails. >> > > >>>> > >> > Moreover, I feel the reason this support should be there >> in >> > the >> > > >>>> Kafka >> > > >>>> > >> > Connect framework is because this is a restriction of the >> > > >>>> framework >> > > >>>> > and >> > > >>>> > >> > today the framework provides no support for getting around >> > this >> > > >>>> > >> limitation. >> > > >>>> > >> > Every connector has it's own way of handling offsets and >> > having >> > > >>>> each >> > > >>>> > >> > connector handle this restriction in its own way can make >> it >> > > >>>> complex. >> > > >>>> > >> > Whether we choose to do it the way this KIP prescribes or >> any >> > > >>>> other >> > > >>>> > way >> > > >>>> > >> is >> > > >>>> > >> > up for debate but IMHO, the framework should provide a >> way of >> > > >>>> > >> > getting around this limitation. >> > > >>>> > >> > >> > > >>>> > >> > 7. If a task produces heartbeat records and source records >> > that >> > > >>>> use >> > > >>>> > the >> > > >>>> > >> > > same source partition, which offset will ultimately be >> > > >>>> committed? >> > > >>>> > >> > >> > > >>>> > >> > >> > > >>>> > >> > The idea is to add the records returned by the >> > > >>>> > `produceHeartbeatRecords` >> > > >>>> > >> > to the same `toSend` list within >> > > >>>> `AbstractWorkerSourceTask#execute`. >> > > >>>> > >> The >> > > >>>> > >> > `produceHeartbeatRecords` would be invoked before we make >> the >> > > >>>> `poll` >> > > >>>> > >> call. >> > > >>>> > >> > Hence, the offsets committed would be in the same order in >> > > which >> > > >>>> they >> > > >>>> > >> would >> > > >>>> > >> > be written. Note that, the onus is on the Connector >> > > >>>> implementation to >> > > >>>> > >> not >> > > >>>> > >> > return records which can lead to data loss or data going >> out >> > of >> > > >>>> order. >> > > >>>> > >> The >> > > >>>> > >> > framework would just commit based on whatever is supplied. >> > > Also, >> > > >>>> > AFAIK, >> > > >>>> > >> 2 >> > > >>>> > >> > `normal` source records can also produce the same source >> > > >>>> partitions >> > > >>>> > and >> > > >>>> > >> > they are committed in the order in which they are written. >> > > >>>> > >> > >> > > >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns >> a >> > > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat >> > topic >> > > >>>> for a >> > > >>>> > >> > > connector via the (connector- or worker-level) >> > > >>>> > >> "heartbeat.records.topic" >> > > >>>> > >> > > property. Since every constructor for the SourceRecord >> > class >> > > >>>> [2] >> > > >>>> > >> > requires a >> > > >>>> > >> > > topic to be supplied, what will happen to that topic? >> Will >> > it >> > > >>>> be >> > > >>>> > >> ignored? >> > > >>>> > >> > > If so, I think we should look for a cleaner solution. >> > > >>>> > >> > >> > > >>>> > >> > >> > > >>>> > >> > Sorry, I couldn't quite follow which topic will be >> ignored in >> > > >>>> this >> > > >>>> > case. >> > > >>>> > >> > >> > > >>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821 >> > was >> > > >>>> the >> > > >>>> > >> allowing >> > > >>>> > >> > > connectors to control the ordering of these special >> > > >>>> "offsets-only" >> > > >>>> > >> > > emissions and the regular source records returned from >> > > >>>> > >> SourceTask::poll. >> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can you >> add >> > > >>>> this to >> > > >>>> > the >> > > >>>> > >> > > rejected alternatives section along with a rationale? >> > > >>>> > >> > >> > > >>>> > >> > >> > > >>>> > >> > One thing to note is that the for every connector, the >> > > condition >> > > >>>> to >> > > >>>> > emit >> > > >>>> > >> > the heartbeat record is totally up to the connector, For >> > > >>>> example, for >> > > >>>> > a >> > > >>>> > >> > connector which is tracking transactions for an ordered >> log, >> > if >> > > >>>> there >> > > >>>> > >> are >> > > >>>> > >> > open transactions, it might not need to emit heartbeat >> > records >> > > >>>> when >> > > >>>> > the >> > > >>>> > >> > timer expires while for file based connectors, if the same >> > file >> > > >>>> is >> > > >>>> > being >> > > >>>> > >> > processed again and again due to an SMT or some other >> > reasons, >> > > >>>> then it >> > > >>>> > >> can >> > > >>>> > >> > choose to emit that partition. The uber point here is that >> > > every >> > > >>>> > >> connector >> > > >>>> > >> > has it's own requirements and the framework can't really >> make >> > > an >> > > >>>> > >> assumption >> > > >>>> > >> > about it. What the KIP is trying to do is to provide a >> > > mechanism >> > > >>>> to >> > > >>>> > the >> > > >>>> > >> > connector to commit new offsets. With this approach, as >> far >> > as >> > > I >> > > >>>> can >> > > >>>> > >> think >> > > >>>> > >> > so far, there doesn't seem to be a case of out of order >> > > >>>> processing. If >> > > >>>> > >> you >> > > >>>> > >> > have other concerns/thoughts I would be happy to know >> them. >> > > >>>> > >> > >> > > >>>> > >> > 10. If, sometime in the future, we wanted to add >> > > framework-level >> > > >>>> > support >> > > >>>> > >> > > for sending heartbeat records that doesn't require >> > connectors >> > > >>>> to >> > > >>>> > >> > implement >> > > >>>> > >> > > any new APIs... >> > > >>>> > >> > >> > > >>>> > >> > >> > > >>>> > >> > The main purpose of producing heartbeat records is to be >> able >> > > to >> > > >>>> emit >> > > >>>> > >> > offsets w/o any new records. We are using heartbeat >> records >> > to >> > > >>>> solve >> > > >>>> > the >> > > >>>> > >> > primary concern of offsets getting stalled. The reason to >> do >> > > >>>> that was >> > > >>>> > >> once >> > > >>>> > >> > we get SourceRecords, then the rest of the code is >> already in >> > > >>>> place to >> > > >>>> > >> > write it to a topic of interest and commit offsets and >> that >> > > >>>> seemed the >> > > >>>> > >> most >> > > >>>> > >> > non invasive in terms of framework level changes. If in >> the >> > > >>>> future we >> > > >>>> > >> want >> > > >>>> > >> > to do a framework-only heartbeat record support, then this >> > > would >> > > >>>> > create >> > > >>>> > >> > confusion as you pointed out. Do you think the choice of >> the >> > > name >> > > >>>> > >> heartbeat >> > > >>>> > >> > records is creating confusion in this case? Maybe we can >> call >> > > >>>> these >> > > >>>> > >> special >> > > >>>> > >> > records something else (not sure what at this point) which >> > > would >> > > >>>> then >> > > >>>> > >> > decouple the 2 logically and implementation wise as well? >> > > >>>> > >> > >> > > >>>> > >> > Thanks! >> > > >>>> > >> > Sagar. >> > > >>>> > >> > >> > > >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton >> > > >>>> <chr...@aiven.io.invalid >> > > >>>> > > >> > > >>>> > >> > wrote: >> > > >>>> > >> > >> > > >>>> > >> > > Hi Sagar, >> > > >>>> > >> > > >> > > >>>> > >> > > Thanks for the KIP! I have some thoughts. >> > > >>>> > >> > > >> > > >>>> > >> > > Nits: >> > > >>>> > >> > > >> > > >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira >> ticket on >> > > >>>> the KIP? >> > > >>>> > >> Or >> > > >>>> > >> > is >> > > >>>> > >> > > there a different ticket that should be associated with >> it? >> > > >>>> > >> > > 2. The current state is listed as "Draft". Considering >> it's >> > > >>>> been >> > > >>>> > >> brought >> > > >>>> > >> > up >> > > >>>> > >> > > for discussion, maybe the KIP should be updated to >> > > >>>> "Discussion"? >> > > >>>> > >> > > 3. Can you add a link for the discussion thread to the >> KIP? >> > > >>>> > >> > > 4. The KIP states that "In this process, offsets are >> > written >> > > at >> > > >>>> > >> regular >> > > >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This >> > isn't >> > > >>>> > strictly >> > > >>>> > >> > > accurate since, when exactly-once support is enabled, >> > offset >> > > >>>> commits >> > > >>>> > >> can >> > > >>>> > >> > > also be performed for each record batch (which is the >> > > default) >> > > >>>> or >> > > >>>> > when >> > > >>>> > >> > > explicitly requested by the task instance (if the >> connector >> > > >>>> > implements >> > > >>>> > >> > the >> > > >>>> > >> > > API to define its own transactions and the user has >> > > configured >> > > >>>> it to >> > > >>>> > >> do >> > > >>>> > >> > > so). Maybe better to just say "Offsets are written >> > > >>>> periodically"? >> > > >>>> > >> > > 5. The description for the (per-connector) >> > > >>>> "heartbeat.records.topic >> > > >>>> > " >> > > >>>> > >> > > property states that it is "Only applicable in >> distributed >> > > >>>> mode; in >> > > >>>> > >> > > standalone mode, setting this property will have no >> > effect". >> > > >>>> Is this >> > > >>>> > >> > > correct? >> > > >>>> > >> > > >> > > >>>> > >> > > Non-nits: >> > > >>>> > >> > > >> > > >>>> > >> > > 6. It seems (based on both the KIP and discussion on >> > > >>>> KAFKA-3821) >> > > >>>> > that >> > > >>>> > >> the >> > > >>>> > >> > > only use case for being able to emit offsets without >> also >> > > >>>> emitting >> > > >>>> > >> source >> > > >>>> > >> > > records that's been identified so far is for CDC source >> > > >>>> connectors >> > > >>>> > >> like >> > > >>>> > >> > > Debezium. But Debezium already has support for this >> exact >> > > >>>> feature >> > > >>>> > >> > (emitting >> > > >>>> > >> > > heartbeat records that include offsets that cannot be >> > > >>>> associated >> > > >>>> > with >> > > >>>> > >> > > other, "regular" source records). Why should we add this >> > > >>>> feature to >> > > >>>> > >> Kafka >> > > >>>> > >> > > Connect when the problem it addresses is already solved >> in >> > > the >> > > >>>> set >> > > >>>> > >> > > connectors that (it seems) would have any need for it, >> and >> > > the >> > > >>>> size >> > > >>>> > of >> > > >>>> > >> > that >> > > >>>> > >> > > set is extremely small? If there are other practical use >> > > cases >> > > >>>> for >> > > >>>> > >> > > connectors that would benefit from this feature, please >> let >> > > me >> > > >>>> know. >> > > >>>> > >> > > >> > > >>>> > >> > > 7. If a task produces heartbeat records and source >> records >> > > >>>> that use >> > > >>>> > >> the >> > > >>>> > >> > > same source partition, which offset will ultimately be >> > > >>>> committed? >> > > >>>> > >> > > >> > > >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method >> returns a >> > > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat >> > topic >> > > >>>> for a >> > > >>>> > >> > > connector via the (connector- or worker-level) >> > > >>>> > >> "heartbeat.records.topic" >> > > >>>> > >> > > property. Since every constructor for the SourceRecord >> > class >> > > >>>> [2] >> > > >>>> > >> > requires a >> > > >>>> > >> > > topic to be supplied, what will happen to that topic? >> Will >> > it >> > > >>>> be >> > > >>>> > >> ignored? >> > > >>>> > >> > > If so, I think we should look for a cleaner solution. >> > > >>>> > >> > > >> > > >>>> > >> > > 9. A large concern raised in the discussion for >> KAFKA-3821 >> > > was >> > > >>>> the >> > > >>>> > >> > allowing >> > > >>>> > >> > > connectors to control the ordering of these special >> > > >>>> "offsets-only" >> > > >>>> > >> > > emissions and the regular source records returned from >> > > >>>> > >> SourceTask::poll. >> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can you >> add >> > > >>>> this to >> > > >>>> > the >> > > >>>> > >> > > rejected alternatives section along with a rationale? >> > > >>>> > >> > > >> > > >>>> > >> > > 10. If, sometime in the future, we wanted to add >> > > >>>> framework-level >> > > >>>> > >> support >> > > >>>> > >> > > for sending heartbeat records that doesn't require >> > connectors >> > > >>>> to >> > > >>>> > >> > implement >> > > >>>> > >> > > any new APIs (e.g., >> SourceTask::produceHeartbeatRecords), a >> > > >>>> lot of >> > > >>>> > >> this >> > > >>>> > >> > > would paint us into a corner design-wise. We'd have to >> > think >> > > >>>> > carefully >> > > >>>> > >> > > about which property names would be used, how to account >> > for >> > > >>>> > >> connectors >> > > >>>> > >> > > that have already implemented the >> > > >>>> > SourceTask::produceHeartbeatRecords >> > > >>>> > >> > > method, etc. In general, it seems like we're trying to >> > solve >> > > >>>> two >> > > >>>> > >> > completely >> > > >>>> > >> > > different problems with this single KIP: adding >> > > framework-level >> > > >>>> > >> support >> > > >>>> > >> > for >> > > >>>> > >> > > emitting heartbeat records for source connectors, and >> > > allowing >> > > >>>> > source >> > > >>>> > >> > > connectors to emit offsets without also emitting source >> > > >>>> records. I >> > > >>>> > >> don't >> > > >>>> > >> > > mind addressing the two at the same time if the result >> is >> > > >>>> elegant >> > > >>>> > and >> > > >>>> > >> > > doesn't compromise on the solution for either problem, >> but >> > > that >> > > >>>> > >> doesn't >> > > >>>> > >> > > seem to be the case here. Of the two problems, could we >> > > >>>> describe one >> > > >>>> > >> as >> > > >>>> > >> > the >> > > >>>> > >> > > primary and one as the secondary? If so, we might >> consider >> > > >>>> dropping >> > > >>>> > >> the >> > > >>>> > >> > > secondary problm from this KIP and addressing it >> > separately. >> > > >>>> > >> > > >> > > >>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821 >> > > >>>> > >> > > [2] - >> > > >>>> > >> > > >> > > >>>> > >> > > >> > > >>>> > >> > >> > > >>>> > >> >> > > >>>> > >> > > >>>> >> > > >> > >> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html >> > > >>>> > >> > > >> > > >>>> > >> > > Cheers, >> > > >>>> > >> > > >> > > >>>> > >> > > Chris >> > > >>>> > >> > > >> > > >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar < >> > > >>>> sagarmeansoc...@gmail.com> >> > > >>>> > >> > wrote: >> > > >>>> > >> > > >> > > >>>> > >> > > > Hi John, >> > > >>>> > >> > > > >> > > >>>> > >> > > > Thanks for taking. look at the KIP! >> > > >>>> > >> > > > >> > > >>>> > >> > > > The point about stream time not advancing in case of >> > > >>>> infrequent >> > > >>>> > >> updates >> > > >>>> > >> > > is >> > > >>>> > >> > > > an interesting one. I can imagine if the upstream >> > producer >> > > >>>> to a >> > > >>>> > >> Kafka >> > > >>>> > >> > > > Streams application is a Source Connector which isn't >> > > sending >> > > >>>> > >> records >> > > >>>> > >> > > > frequently(due to the nature of the data ingestion for >> > > >>>> example), >> > > >>>> > >> then >> > > >>>> > >> > the >> > > >>>> > >> > > > downstream stream processing can land into the issues >> you >> > > >>>> > described >> > > >>>> > >> > > above. >> > > >>>> > >> > > > >> > > >>>> > >> > > > Which also brings me to the second point you made >> about >> > how >> > > >>>> this >> > > >>>> > >> would >> > > >>>> > >> > be >> > > >>>> > >> > > > used by downstream consumers. IIUC, you are referring >> to >> > > the >> > > >>>> > >> consumers >> > > >>>> > >> > of >> > > >>>> > >> > > > the newly added topic i.e the heartbeat topic. In my >> > mind, >> > > >>>> the >> > > >>>> > >> > heartbeat >> > > >>>> > >> > > > topic is an internal topic (similar to >> > > offsets/config/status >> > > >>>> topic >> > > >>>> > >> in >> > > >>>> > >> > > > connect), the main purpose of which is to trick the >> > > >>>> framework to >> > > >>>> > >> > produce >> > > >>>> > >> > > > records to the offsets topic and advance the offsets. >> > Since >> > > >>>> every >> > > >>>> > >> > > connector >> > > >>>> > >> > > > could have a different definition of offsets(LSN, >> > BinLogID >> > > >>>> etc for >> > > >>>> > >> > > > example), that logic to determine what the heartbeat >> > > records >> > > >>>> > should >> > > >>>> > >> be >> > > >>>> > >> > > > would have to reside in the actual connector. >> > > >>>> > >> > > > >> > > >>>> > >> > > > Now that I think of it, it could very well be >> consumed by >> > > >>>> > downstream >> > > >>>> > >> > > > consumers/ Streams or Flink Applications and be >> further >> > > used >> > > >>>> for >> > > >>>> > >> some >> > > >>>> > >> > > > decision making. A very crude example could be let's >> say >> > if >> > > >>>> the >> > > >>>> > >> > heartbeat >> > > >>>> > >> > > > records sent to the new heartbeat topic include >> > timestamps, >> > > >>>> then >> > > >>>> > the >> > > >>>> > >> > > > downstream streams application can use that timestamp >> to >> > > >>>> close any >> > > >>>> > >> time >> > > >>>> > >> > > > windows. Having said that, it still appears to me that >> > it's >> > > >>>> > outside >> > > >>>> > >> the >> > > >>>> > >> > > > scope of the Connect framework and is something which >> is >> > > >>>> difficult >> > > >>>> > >> to >> > > >>>> > >> > > > generalise because of the variety of Sources and the >> > > >>>> definitions >> > > >>>> > of >> > > >>>> > >> > > > offsets. >> > > >>>> > >> > > > >> > > >>>> > >> > > > But, I would still be more than happy to add this >> example >> > > if >> > > >>>> you >> > > >>>> > >> think >> > > >>>> > >> > it >> > > >>>> > >> > > > can be useful in getting a better understanding of the >> > idea >> > > >>>> and >> > > >>>> > also >> > > >>>> > >> > its >> > > >>>> > >> > > > utility beyond connect. Please let me know! >> > > >>>> > >> > > > >> > > >>>> > >> > > > Thanks! >> > > >>>> > >> > > > Sagar. >> > > >>>> > >> > > > >> > > >>>> > >> > > > >> > > >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler < >> > > >>>> vvcep...@apache.org >> > > >>>> > > >> > > >>>> > >> > > wrote: >> > > >>>> > >> > > > >> > > >>>> > >> > > > > Thanks for the KIP, Sagar! >> > > >>>> > >> > > > > >> > > >>>> > >> > > > > At first glance, this seems like a very useful >> feature. >> > > >>>> > >> > > > > >> > > >>>> > >> > > > > A common pain point in Streams is when upstream >> > producers >> > > >>>> don't >> > > >>>> > >> send >> > > >>>> > >> > > > > regular updates and stream time cannot advance. This >> > > causes >> > > >>>> > >> > > > > stream-time-driven operations to appear to hang, >> like >> > > time >> > > >>>> > windows >> > > >>>> > >> > not >> > > >>>> > >> > > > > closing, suppressions not firing, etc. >> > > >>>> > >> > > > > >> > > >>>> > >> > > > > From your KIP, I have a good idea of how the feature >> > > would >> > > >>>> be >> > > >>>> > >> > > integrated >> > > >>>> > >> > > > > into connect, and it sounds good to me. I don't >> quite >> > see >> > > >>>> how >> > > >>>> > >> > > downstream >> > > >>>> > >> > > > > clients, such as a downstream Streams or Flink >> > > >>>> application, or >> > > >>>> > >> users >> > > >>>> > >> > of >> > > >>>> > >> > > > the >> > > >>>> > >> > > > > Consumer would make use of this feature. Could you >> add >> > > some >> > > >>>> > >> examples >> > > >>>> > >> > of >> > > >>>> > >> > > > > that nature? >> > > >>>> > >> > > > > >> > > >>>> > >> > > > > Thank you, >> > > >>>> > >> > > > > -John >> > > >>>> > >> > > > > >> > > >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote: >> > > >>>> > >> > > > > > Hi All, >> > > >>>> > >> > > > > > >> > > >>>> > >> > > > > > Bumping the thread again. >> > > >>>> > >> > > > > > >> > > >>>> > >> > > > > > Sagar. >> > > >>>> > >> > > > > > >> > > >>>> > >> > > > > > >> > > >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar < >> > > >>>> > >> sagarmeansoc...@gmail.com> >> > > >>>> > >> > > > wrote: >> > > >>>> > >> > > > > > >> > > >>>> > >> > > > > >> Hi All, >> > > >>>> > >> > > > > >> >> > > >>>> > >> > > > > >> Bumping this discussion thread again. >> > > >>>> > >> > > > > >> >> > > >>>> > >> > > > > >> Thanks! >> > > >>>> > >> > > > > >> Sagar. >> > > >>>> > >> > > > > >> >> > > >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar < >> > > >>>> > >> sagarmeansoc...@gmail.com> >> > > >>>> > >> > > > wrote: >> > > >>>> > >> > > > > >> >> > > >>>> > >> > > > > >>> Hi All, >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >>> I wanted to create a discussion thread for >> KIP-910: >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >> > > >>>> > >> > > > >> > > >>>> > >> > > >> > > >>>> > >> > >> > > >>>> > >> >> > > >>>> > >> > > >>>> >> > > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >>> Thanks! >> > > >>>> > >> > > > > >>> Sagar. >> > > >>>> > >> > > > > >>> >> > > >>>> > >> > > > > >> >> > > >>>> > >> > > > > >> > > >>>> > >> > > > >> > > >>>> > >> > > >> > > >>>> > >> > >> > > >>>> > >> >> > > >>>> > > >> > > >>>> > >> > > >>>> >> > > >>> >> > > >> > >> >