Hey All,

Please let me know how the KIP looks now. Is it at a stage where I can
start with the Voting phase? Of course I am still open to
feedback/suggestions but planning to start the Vote for it.

Thanks!
Sagar.

On Tue, Jul 11, 2023 at 10:00 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hi Yash/Chris,
>
> Thanks for the feedback! I have updated the KIP with the suggestions
> provided. I would also update the PR with the suggestions.
>
> Also, I was hoping that this could make it to the 3.6 release given that
> it would benefit source connectors which have some of the problems listed
> in the Motivation Section.
>
> Responses Inline:
>
> Yash:
>
> 1) In the proposed changes section where you talk about modifying the
>> offsets, could you please clarify that tasks shouldn't modify the offsets
>> map that is passed as an argument? Currently, the distinction between the
>> offsets map passed as an argument and the offsets map that is returned is
>> not very clear in numerous places.
>
>
>
> Added
>
> 2) The default return value of Optional.empty() seems to be fairly
>> non-intuitive considering that the return value is supposed to be the
>> offsets that are to be committed. Can we consider simply returning the
>> offsets argument itself by default instead?
>
>
>
> Chris is suggesting returning null for the default case. I am thinking to
> make null
> as the default return type. If the returned map is null, there won't be
> any further
> processing otherwise we will contonue with the existing logic.
>
> 3) The KIP states that "It is also possible that a task might choose to
>> send a tombstone record as an offset. This is not recommended and to
>> prevent connectors shooting themselves in the foot due to this" - could
>> you
>> please clarify why this is not recommended / supported?
>
>
>
> I have added that a better way of doing that would be via KIP-875. Also, I
> didn't want to include
> any mechamisms for users to meddle with the offsets topic. Allowing
> tombstone records via this method
> would be akin to publishing tombstone records directly to the offsets
> topic which is not recommended
> generally.
>
> 4) The KIP states that "If a task returns an Optional of a null object or
>> an Optional of an empty map, even for such cases the behaviour would would
>> be disabled." - since this is an optional API that source task
>> implementations don't necessarily need to implement, I don't think I fully
>> follow why the return type of the proposed "updateOffsets" method is an
>> Optional? Can we not simply use the Map as the return type instead?
>
>
>
> Yeah, I updated the return type to be a Map.
>
>
> 5) The KIP states that "The offsets passed to the updateOffsets  method
>> would be the offset from the latest source record amongst all source
>> records per partition. This way, if the source offset for a given source
>> partition is updated, that offset is the one that gets committed for the
>> source partition." - we should clarify that the "latest" offset refers to
>> the offsets that are about to be committed, and not the latest offsets
>> returned from SourceTask::poll so far (see related discussion in
>> https://issues.apache.org/jira/browse/KAFKA-15091 and
>> https://issues.apache.org/jira/browse/KAFKA-5716).
>
>
>
> Done
>
>
> 6) We haven't used the terminology of "Atleast Once Semantics" elsewhere in
>> Connect since the framework itself does not (and cannot) make any
>> guarantees on the delivery semantics. Depending on the source connector
>> and
>> the source system, both at-least once and at-most once semantics (for
>> example - a source system where reads are destructive) are possible. We
>> should avoid introducing this terminology in the KIP and instead refer to
>> this scenario as exactly-once support being disabled.
>
>
>
> Done
>
>
> 7) Similar to the above point, we should remove the use of the term
>> "Exactly Once Semantics" and instead refer to exactly-once support being
>> enabled since the framework can't guarantee exactly-once semantics for all
>> possible source connectors (for example - a message queue source connector
>> where offsets are essentially managed in the source system via an ack
>> mechanism).
>
>
> Done
>
> 8) In a previous attempt to fix this gap in functionality, a significant
>> concern was raised on offsets ordering guarantees when we retry sending a
>> batch of records (ref -
>> https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
>> look like this KIP addresses that concern either? In the case where
>> exactly-once support is disabled - if we update the committableOffsets
>> with
>> the offsets provided by the task through the new updateOffsets method,
>> these offsets could be committed before older "regular" offsets are
>> committed due to producer retries which could then lead to an
>> inconsistency
>> if the send operation eventually succeeds.
>
>
>
>
> Thanks for bringing this up. I went through the comment shared above. If
> you see the implementation
> that I have in the PR, in EOS-disabled case, updateOffsets is invoked only
> when toSend is null. Refer
> here:
> https://github.com/apache/kafka/pull/13899/files#diff-a3107b56382b6ec950dc9d19d21f188c21d4bf41853e0505d60d3bf87adab6a9R324-R330
>
>
> Which means that we invoke updateOffsets only when
> 1) Either the last poll invocation didn't return any records or
> 2) All the records returned by the previous poll invocation got processed
> successfully
> 3) First iteration of task because toSend would be null initially.
>
>
> IIUC the concern expressed in the link shared by you and the solution
> proposed there, it seems that's what is being proposed
>
>
>  What if your new block of code were only performed if sendRecords()
>> succeeded
>
>
>
>  Even for this there are concerns expressed but those don't seem to be
> related to offsets ordering guarantees. WDYT?
>
>
> 9) The KIP states that when exactly-once support is enabled, the new
>> SourceTask::updateOffsets method will be invoked only when an offset flush
>> is attempted. If the connector is configured to use a connector specified
>> transaction boundary rather than a poll or interval based boundary, isn't
>> it possible that we don't call SourceTask::updateOffsets until there are
>> actual records that are also being returned through poll (which would
>> defeat the primary motivation of the KIP)? Or are we making the assumption
>> that the connector defined transaction boundary should handle this case
>> appropriately if needed (i.e. source tasks should occasionally request for
>> a transaction commit via their transaction context if they want offsets to
>> be committed without producing records)? If so, I think we should
>> explicitly call that out in the KIP.
>
>
>
> That's a great point. I didn't consider this case. I have updated the KIP.
>
> 10) The Javadoc for SourceTask::updateOffsets in the section on public
>> interfaces also has the same issue with the definition of latest offsets
>> that I've mentioned above (latest offsets from poll versus latest offsets
>> that are about to be committed).
>
>
> Done
>
> 11) The Javadoc for SourceTask::updateOffsets also introduces the same
>> confusion w.r.t updating offsets that I've mentioned above (modifying the
>> offsets map argument versus returning a modified copy of the offsets map).
>
>
>
> I have modified the verbiage and even the meaning of the return type as
> suggested by Chris.
>
> 12) In the section on compatibility, we should explicitly mention that
>> connectors which implement the new method will still be compatible with
>> older Connect runtimes where the method will simply not be invoked.
>
>
> Done
>
>
> Chris:
>
> 1. (Nit) Can we move the "Public Interfaces" section before the "Proposed
>> Changes" section? It's nice to have a summary of the user/developer-facing
>> changes first since that answers many of the questions that I had while
>> reading the "Proposed Changes" section. I'd bet that this is also why we
>> use that ordering in the KIP template.
>
>
>
> Done
>
> 2. Why are we invoking SourceTask::updateOffsets so frequently when
>> exactly-once support is disabled? Wouldn't it be simpler both for our
>> implementation and for connector developers if we only invoked it directly
>> before committing offsets, instead of potentially several times between
>> offset commits, especially since that would also mirror the behavior with
>> exactly-once support enabled?
>
>
>
> Hmm the idea was to keep the changes bounded within the SourceTask loop.
> Since the EOS-disabled case
> uses a separate thread  to commit offsets, I thought it's easier to have
> the updateOffsets invoked in
> the same loop and have it update the committableOffsets. The committer
> thread will keep doing what it
> does today. I felt this is easier to reason about. WDYT?
>
>
> 3. Building off of point 2, we wouldn't need to specify any more detail
>> than that "SourceTask::updateOffsets will be invoked directly before
>> committing offsets, with the to-be-committed offsets". There would be no
>> need to distinguish between when exactly-once support is enabled or
>> disabled.
>
>
>
> Yeah I have added the fact that updateOffsets would be invoked before
> committing offsets with about to be committed offsets.
> I have still left the EOS enabled/disabled intact because there are
> differences that I wanted to highlight like honouring
> Transaction boundaries and another edge case with Connector transaction
> boundary mode that Yash had brought up.
>
>
> 4. Some general stylistic feedback: we shouldn't mention the names of
>> internal classes or methods in KIPs. KIPS are for discussing high-level
>> design proposals. Internal names and APIS may change over time, and are
>> not
>> very helpful to readers who are not already familiar with the code base.
>> Instead, we should describe changes in behavior, not code.
>
>
>
> Yeah I generally avoid dwelling into the details but in this case I felt I
> need to explain a bit more why
> I am proposing what I am proposing. I have made the edits.
>
> 5. Why return a complete map of to-be-committed offsets instead of a map of
>> just the offsets that the connector wants to change? This seems especially
>> intuitive since we automatically re-insert source partitions that have
>> been
>> removed by the connector.
>
>
>
> Makes sense. I updated the KIP accordingly.
>
> 6. I don't think we don't need to return an Optional from
>> SourceTask::updateOffsets. Developers can return null instead of
>> Optional.empty(), and since the framework will have to handle null return
>> values either way, this would reduce the number of cases for us to handle
>> from three (Optional.of(...), Optional.empty(), null) to two (null,
>> non-null).
>
>
>
> I see. I didn't want to have explicit null checks but then I realised
> connect does have explicit null
> checks. Edited.
>
>
> 7. Why disallow tombstone records? If an upstream resource disappears, then
>> wouldn't a task want to emit a tombstone record without having to also
>> emit
>> an accompanying source record? This could help prevent an
>> infinitely-growing offsets topic, although with KIP-875 coming out in the
>> next release, perhaps we can leave this out for now and let Connect users
>> and cluster administrators do this work manually instead of letting
>> connector developers automate it.
>
>
>
> Even before I considered KIP-875's effects, my thought was to not meddle
> too much with the inner
> workings of the offsets topic. I think even today users can produce an
> offset record to the offsets
> topic to drop an unwanted partition but that should be used as a last
> resort. I didn't want to introduce
> any such mechanisms via this proposal. And with KIP-875 coming in, it
> makes all the more sense to not do
> it and have the offsets deleted in a more standardised way. The last part
> about KIP-875 is what I have mentioned
> in the KIP.
>
>
> 8. Is the information on multiple offsets topics for exactly-once
>> connectors relevant to this KIP? If not, we should remove it.
>
>
> Removed.
>
>
> 9. It seems like most of the use cases that motivate this KIP only require
>> being able to add a new source partition/source offset pair to the
>> to-be-committed offsets. Do we need to allow connector developers to
>> modify
>> source offsets for already-present source partitions at all? If we reduce
>> the surface of the API, then the worst case is still just that the offsets
>> we commit are at most one commit out-of-date.
>
>
> It could be useful in a scenario where the offset of a partition doesn't
> update for some period of time. In
> such cases, the connector can do some kind of state tracking and update
> the offsets after the time period elapses.
>
> I had mentioned an example of this scenario in an earlier e-mail:
>
>
> There's also a case at times with CDC source connectors which are REST Api
>> / Web Service based(Zendesk Source Connector for example) . These
>> connectors typically use timestamps from the responses as offsets. If
>> there's a long period of inactivity wherein the API invocations don't
>> return any data, then the offsets won't move and the connector would keep
>> using the same timestamp that it received from the last non-empty response.
>> If this period of inactivity keeps growing, and the API imposes any limits
>> on how far back we can go in terms of window start, then this could
>> potentially be a problem. In this case even though the connector was caught
>> up with all the responses, it may need to snapshot again. In this case
>> updating offsets can easily help since all the connector needs to do is to
>> move the timestamp which would move the offset inherently.
>
>
>
>
> 10. (Nit) The "Motivation" section states that "offsets are written
>> periodically by the connect framework to an offsets topic". This is only
>> true in distributed mode; in standalone mode, we write offsets to a local
>> file.
>
>
>
> Ack.
>
> On Wed, Jul 5, 2023 at 8:47 PM Chris Egerton <chr...@aiven.io.invalid>
> wrote:
>
>> Hi Sagar,
>>
>> Thanks for updating the KIP! The latest draft seems simpler and more
>> focused, which I think is a win for users and developers alike. Here are
>> my
>> thoughts on the current draft:
>>
>> 1. (Nit) Can we move the "Public Interfaces" section before the "Proposed
>> Changes" section? It's nice to have a summary of the user/developer-facing
>> changes first since that answers many of the questions that I had while
>> reading the "Proposed Changes" section. I'd bet that this is also why we
>> use that ordering in the KIP template.
>>
>> 2. Why are we invoking SourceTask::updateOffsets so frequently when
>> exactly-once support is disabled? Wouldn't it be simpler both for our
>> implementation and for connector developers if we only invoked it directly
>> before committing offsets, instead of potentially several times between
>> offset commits, especially since that would also mirror the behavior with
>> exactly-once support enabled?
>>
>> 3. Building off of point 2, we wouldn't need to specify any more detail
>> than that "SourceTask::updateOffsets will be invoked directly before
>> committing offsets, with the to-be-committed offsets". There would be no
>> need to distinguish between when exactly-once support is enabled or
>> disabled.
>>
>> 4. Some general stylistic feedback: we shouldn't mention the names of
>> internal classes or methods in KIPs. KIPS are for discussing high-level
>> design proposals. Internal names and APIS may change over time, and are
>> not
>> very helpful to readers who are not already familiar with the code base.
>> Instead, we should describe changes in behavior, not code.
>>
>> 5. Why return a complete map of to-be-committed offsets instead of a map
>> of
>> just the offsets that the connector wants to change? This seems especially
>> intuitive since we automatically re-insert source partitions that have
>> been
>> removed by the connector.
>>
>> 6. I don't think we don't need to return an Optional from
>> SourceTask::updateOffsets. Developers can return null instead of
>> Optional.empty(), and since the framework will have to handle null return
>> values either way, this would reduce the number of cases for us to handle
>> from three (Optional.of(...), Optional.empty(), null) to two (null,
>> non-null).
>>
>> 7. Why disallow tombstone records? If an upstream resource disappears,
>> then
>> wouldn't a task want to emit a tombstone record without having to also
>> emit
>> an accompanying source record? This could help prevent an
>> infinitely-growing offsets topic, although with KIP-875 coming out in the
>> next release, perhaps we can leave this out for now and let Connect users
>> and cluster administrators do this work manually instead of letting
>> connector developers automate it.
>>
>> 8. Is the information on multiple offsets topics for exactly-once
>> connectors relevant to this KIP? If not, we should remove it.
>>
>> 9. It seems like most of the use cases that motivate this KIP only require
>> being able to add a new source partition/source offset pair to the
>> to-be-committed offsets. Do we need to allow connector developers to
>> modify
>> source offsets for already-present source partitions at all? If we reduce
>> the surface of the API, then the worst case is still just that the offsets
>> we commit are at most one commit out-of-date.
>>
>> 10. (Nit) The "Motivation" section states that "offsets are written
>> periodically by the connect framework to an offsets topic". This is only
>> true in distributed mode; in standalone mode, we write offsets to a local
>> file.
>>
>> Cheers,
>>
>> Chris
>>
>> On Tue, Jul 4, 2023 at 8:42 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>>
>> > Hi Sagar,
>> >
>> > Thanks for your continued work on this KIP! Here are my thoughts on your
>> > updated proposal:
>> >
>> > 1) In the proposed changes section where you talk about modifying the
>> > offsets, could you please clarify that tasks shouldn't modify the
>> offsets
>> > map that is passed as an argument? Currently, the distinction between
>> the
>> > offsets map passed as an argument and the offsets map that is returned
>> is
>> > not very clear in numerous places.
>> >
>> > 2) The default return value of Optional.empty() seems to be fairly
>> > non-intuitive considering that the return value is supposed to be the
>> > offsets that are to be committed. Can we consider simply returning the
>> > offsets argument itself by default instead?
>> >
>> > 3) The KIP states that "It is also possible that a task might choose to
>> > send a tombstone record as an offset. This is not recommended and to
>> > prevent connectors shooting themselves in the foot due to this" - could
>> you
>> > please clarify why this is not recommended / supported?
>> >
>> > 4) The KIP states that "If a task returns an Optional of a null object
>> or
>> > an Optional of an empty map, even for such cases the behaviour would
>> would
>> > be disabled." - since this is an optional API that source task
>> > implementations don't necessarily need to implement, I don't think I
>> fully
>> > follow why the return type of the proposed "updateOffsets" method is an
>> > Optional? Can we not simply use the Map as the return type instead?
>> >
>> > 5) The KIP states that "The offsets passed to the updateOffsets  method
>> > would be the offset from the latest source record amongst all source
>> > records per partition. This way, if the source offset for a given source
>> > partition is updated, that offset is the one that gets committed for the
>> > source partition." - we should clarify that the "latest" offset refers
>> to
>> > the offsets that are about to be committed, and not the latest offsets
>> > returned from SourceTask::poll so far (see related discussion in
>> > https://issues.apache.org/jira/browse/KAFKA-15091 and
>> > https://issues.apache.org/jira/browse/KAFKA-5716).
>> >
>> > 6) We haven't used the terminology of "Atleast Once Semantics"
>> elsewhere in
>> > Connect since the framework itself does not (and cannot) make any
>> > guarantees on the delivery semantics. Depending on the source connector
>> and
>> > the source system, both at-least once and at-most once semantics (for
>> > example - a source system where reads are destructive) are possible. We
>> > should avoid introducing this terminology in the KIP and instead refer
>> to
>> > this scenario as exactly-once support being disabled.
>> >
>> > 7) Similar to the above point, we should remove the use of the term
>> > "Exactly Once Semantics" and instead refer to exactly-once support being
>> > enabled since the framework can't guarantee exactly-once semantics for
>> all
>> > possible source connectors (for example - a message queue source
>> connector
>> > where offsets are essentially managed in the source system via an ack
>> > mechanism).
>> >
>> > 8) In a previous attempt to fix this gap in functionality, a significant
>> > concern was raised on offsets ordering guarantees when we retry sending
>> a
>> > batch of records (ref -
>> > https://github.com/apache/kafka/pull/5553/files#r213329307). It doesn't
>> > look like this KIP addresses that concern either? In the case where
>> > exactly-once support is disabled - if we update the committableOffsets
>> with
>> > the offsets provided by the task through the new updateOffsets method,
>> > these offsets could be committed before older "regular" offsets are
>> > committed due to producer retries which could then lead to an
>> inconsistency
>> > if the send operation eventually succeeds.
>> >
>> > 9) The KIP states that when exactly-once support is enabled, the new
>> > SourceTask::updateOffsets method will be invoked only when an offset
>> flush
>> > is attempted. If the connector is configured to use a connector
>> specified
>> > transaction boundary rather than a poll or interval based boundary,
>> isn't
>> > it possible that we don't call SourceTask::updateOffsets until there are
>> > actual records that are also being returned through poll (which would
>> > defeat the primary motivation of the KIP)? Or are we making the
>> assumption
>> > that the connector defined transaction boundary should handle this case
>> > appropriately if needed (i.e. source tasks should occasionally request
>> for
>> > a transaction commit via their transaction context if they want offsets
>> to
>> > be committed without producing records)? If so, I think we should
>> > explicitly call that out in the KIP.
>> >
>> > 10) The Javadoc for SourceTask::updateOffsets in the section on public
>> > interfaces also has the same issue with the definition of latest offsets
>> > that I've mentioned above (latest offsets from poll versus latest
>> offsets
>> > that are about to be committed).
>> >
>> > 11) The Javadoc for SourceTask::updateOffsets also introduces the same
>> > confusion w.r.t updating offsets that I've mentioned above (modifying
>> the
>> > offsets map argument versus returning a modified copy of the offsets
>> map).
>> >
>> > 12) In the section on compatibility, we should explicitly mention that
>> > connectors which implement the new method will still be compatible with
>> > older Connect runtimes where the method will simply not be invoked.
>> >
>> >
>> > Thanks,
>> > Yash
>> >
>> > On Wed, Jun 21, 2023 at 10:25 PM Sagar <sagarmeansoc...@gmail.com>
>> wrote:
>> >
>> > > Hi All,
>> > >
>> > > I have created this PR: https://github.com/apache/kafka/pull/13899
>> which
>> > > implements the approach outlined in the latest version of the KIP. I
>> > > thought I could use this to validate the approach based on my
>> > understanding
>> > > while the KIP itself gets reviewed. I can always change the
>> > implementation
>> > > once we move to a final decision on the KIP.
>> > >
>> > > Thanks!
>> > > Sagar.
>> > >
>> > >
>> > > On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com>
>> wrote:
>> > >
>> > > > Hey All,
>> > > >
>> > > > Bumping this discussion thread again to see how the modified KIP
>> looks
>> > > > like.
>> > > >
>> > > > Thanks!
>> > > > Sagar.
>> > > >
>> > > > On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com>
>> > wrote:
>> > > >
>> > > >> Hi,
>> > > >>
>> > > >> Bumping this thread again for further reviews.
>> > > >>
>> > > >> Thanks!
>> > > >> Sagar.
>> > > >>
>> > > >> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com>
>> > > wrote:
>> > > >>
>> > > >>> Hi All,
>> > > >>>
>> > > >>> Thanks for the comments/reviews. I have updated the KIP
>> > > >>>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> > > >>> with a newer approach which shelves the need for an explicit
>> topic.
>> > > >>>
>> > > >>> Please review again and let me know what you think.
>> > > >>>
>> > > >>> Thanks!
>> > > >>> Sagar.
>> > > >>>
>> > > >>>
>> > > >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com>
>> > > wrote:
>> > > >>>
>> > > >>>> Hi Sagar,
>> > > >>>>
>> > > >>>> Thanks for the KIP! I have a few questions and comments:
>> > > >>>>
>> > > >>>> 1) I agree with Chris' point about the separation of a connector
>> > > >>>> heartbeat
>> > > >>>> mechanism and allowing source connectors to generate offsets
>> without
>> > > >>>> producing data. What is the purpose of the heartbeat topic here
>> and
>> > > are
>> > > >>>> there any concrete use cases for downstream consumers on this
>> topic?
>> > > Why
>> > > >>>> can't we instead simply introduce a mechanism to retrieve a list
>> of
>> > > >>>> source
>> > > >>>> partition / source offset pairs from the source tasks?
>> > > >>>>
>> > > >>>> 2) With the currently described mechanism, the new
>> > > >>>> "SourceTask::produceHeartbeatRecords" method returns a
>> > > >>>> "List<SourceRecord>"
>> > > >>>> - what happens with the topic in each of these source records?
>> Chris
>> > > >>>> pointed this out above, but it doesn't seem to have been
>> addressed?
>> > > The
>> > > >>>> "SourceRecord" class also has a bunch of other fields which will
>> be
>> > > >>>> irrelevant here (partition, key / value schema, key / value data,
>> > > >>>> timestamp, headers). In fact, it seems like only the source
>> > partition
>> > > >>>> and
>> > > >>>> source offset are relevant here, so we should either introduce a
>> new
>> > > >>>> abstraction or simply use a data structure like a mapping from
>> > source
>> > > >>>> partitions to source offsets (adds to the above point)?
>> > > >>>>
>> > > >>>> 3) I'm not sure I fully follow why the heartbeat timer /
>> interval is
>> > > >>>> needed? What are the downsides of
>> > > >>>> calling "SourceTask::produceHeartbeatRecords" in every execution
>> > loop
>> > > >>>> (similar to the existing "SourceTask::poll" method)? Is this
>> only to
>> > > >>>> prevent the generation of a lot of offset records? Since
>> Connect's
>> > > >>>> offsets
>> > > >>>> topics are log compacted (and source partitions are used as keys
>> for
>> > > >>>> each
>> > > >>>> source offset), I'm not sure if such concerns are valid and such
>> a
>> > > >>>> heartbeat timer / interval mechanism is required?
>> > > >>>>
>> > > >>>> 4) The first couple of rejected alternatives state that the use
>> of a
>> > > >>>> null
>> > > >>>> topic / key / value are preferably avoided - but the current
>> > proposal
>> > > >>>> would
>> > > >>>> also likely require connectors to use such workarounds (null
>> topic
>> > > when
>> > > >>>> the
>> > > >>>> heartbeat topic is configured at a worker level and always for
>> the
>> > > key /
>> > > >>>> value)?
>> > > >>>>
>> > > >>>> 5) The third rejected alternative talks about subclassing the
>> > > >>>> "SourceRecord" class - this presumably means allowing connectors
>> to
>> > > pass
>> > > >>>> special offset only records via the existing poll mechanism? Why
>> was
>> > > >>>> this
>> > > >>>> considered a more invasive option? Was it because of the backward
>> > > >>>> compatibility issues that would be introduced for plugins using
>> the
>> > > new
>> > > >>>> public API class that still need to be deployed onto older
>> Connect
>> > > >>>> workers?
>> > > >>>>
>> > > >>>> Thanks,
>> > > >>>> Yash
>> > > >>>>
>> > > >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com
>> >
>> > > >>>> wrote:
>> > > >>>>
>> > > >>>> > One thing I forgot to mention in my previous email was that the
>> > > >>>> reason I
>> > > >>>> > chose to include the opt-in behaviour via configs was that the
>> > users
>> > > >>>> of the
>> > > >>>> > connector know their workload patterns. If the workload is such
>> > that
>> > > >>>> the
>> > > >>>> >  connector would receive regular valid updates then there’s
>> > ideally
>> > > >>>> no need
>> > > >>>> > for moving offsets since it would update automatically.
>> > > >>>> >
>> > > >>>> > This way they aren’t forced to use this feature and can use it
>> > only
>> > > >>>> when
>> > > >>>> > the workload is expected to be batchy or not frequent.
>> > > >>>> >
>> > > >>>> > Thanks!
>> > > >>>> > Sagar.
>> > > >>>> >
>> > > >>>> >
>> > > >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <
>> sagarmeansoc...@gmail.com>
>> > > >>>> wrote:
>> > > >>>> >
>> > > >>>> > > Hi Chris,
>> > > >>>> > >
>> > > >>>> > > Thanks for following up on the response. Sharing my thoughts
>> > > >>>> further:
>> > > >>>> > >
>> > > >>>> > > If we want to add support for connectors to emit offsets
>> without
>> > > >>>> > >> accompanying source records, we could (and IMO should) do
>> that
>> > > >>>> without
>> > > >>>> > >> requiring users to manually enable that feature by adjusting
>> > > >>>> worker or
>> > > >>>> > >> connector configurations.
>> > > >>>> > >
>> > > >>>> > >
>> > > >>>> > > With the current KIP design, I have tried to implement this
>> in
>> > an
>> > > >>>> opt-in
>> > > >>>> > > manner via configs. I guess what you are trying to say is
>> that
>> > > this
>> > > >>>> > doesn't
>> > > >>>> > > need a config of it's own and instead could be part of the
>> poll
>> > ->
>> > > >>>> > > transform etc -> produce -> commit cycle. That way, the users
>> > > don't
>> > > >>>> need
>> > > >>>> > to
>> > > >>>> > > set any config and if the connector supports moving offsets
>> w/o
>> > > >>>> producing
>> > > >>>> > > SourceRecords, it should happen automatically. Is that
>> correct?
>> > If
>> > > >>>> that
>> > > >>>> > > is the concern, then I can think of not exposing a config and
>> > try
>> > > >>>> to make
>> > > >>>> > > this process automatically. That should ease the load on
>> > connector
>> > > >>>> users,
>> > > >>>> > > but your point about cognitive load on Connector developers,
>> I
>> > am
>> > > >>>> still
>> > > >>>> > not
>> > > >>>> > > sure how to address that. The offsets are privy to a
>> connector
>> > and
>> > > >>>> the
>> > > >>>> > > framework at best can provide hooks to the tasks to update
>> their
>> > > >>>> offsets.
>> > > >>>> > > Connector developers would still have to consider all cases
>> > before
>> > > >>>> > updating
>> > > >>>> > > offsets.  And if I ignore the heartbeat topic and heartbeat
>> > > >>>> interval ms
>> > > >>>> > > configs, then what the KIP proposes currently isn't much
>> > different
>> > > >>>> in
>> > > >>>> > that
>> > > >>>> > > regard. Just that it produces a List of SourceRecord which
>> can
>> > be
>> > > >>>> changed
>> > > >>>> > > to a Map of SourcePartition and their offsets if you think
>> that
>> > > >>>> would
>> > > >>>> > > simplify things. Are there other cases in your mind which
>> need
>> > > >>>> > addressing?
>> > > >>>> > >
>> > > >>>> > > Here's my take on the usecases:
>> > > >>>> > >
>> > > >>>> > >    1. Regarding the example about SMTs with Object Storage
>> based
>> > > >>>> > >    connectors, it was one of the scenarios identified. We
>> have
>> > > some
>> > > >>>> > connectors
>> > > >>>> > >    that rely on the offsets topic to check if the next batch
>> of
>> > > >>>> files
>> > > >>>> > should
>> > > >>>> > >    be processed and because of filtering of the last record
>> from
>> > > the
>> > > >>>> > files,
>> > > >>>> > >    the eof supposedly is  never reached and the connector
>> can't
>> > > >>>> commit
>> > > >>>> > offsets
>> > > >>>> > >    for that source partition(file). If there was a mechanism
>> to
>> > > >>>> update
>> > > >>>> > offsets
>> > > >>>> > >    for such a source file, then with some moderately complex
>> > state
>> > > >>>> > tracking,
>> > > >>>> > >    the connector can mark that file as processed and proceed.
>> > > >>>> > >    2. There's another use case with the same class of
>> connectors
>> > > >>>> where if
>> > > >>>> > >    a file is malformed, then the connector couldn't produce
>> any
>> > > >>>> offsets
>> > > >>>> > >    because the file couldn't get processed completely. To
>> handle
>> > > >>>> such
>> > > >>>> > cases,
>> > > >>>> > >    the connector developers have introduced a dev/null sort
>> of
>> > > topic
>> > > >>>> > where
>> > > >>>> > >    they produce a record to this corrupted file topic and
>> move
>> > the
>> > > >>>> offset
>> > > >>>> > >    somehow. This topic ideally isn't needed and with a
>> mechanism
>> > > to
>> > > >>>> > update
>> > > >>>> > >    offsets would have helped in this case as well.
>> > > >>>> > >    3. Coming to CDC based connectors,
>> > > >>>> > >       1. We had a similar issue with Oracle CDC source
>> connector
>> > > and
>> > > >>>> > >       needed to employ the same heartbeat mechanism to get
>> > around
>> > > >>>> it.
>> > > >>>> > >       2. MongoDB CDC source Connector  has employed the same
>> > > >>>> heartbeat
>> > > >>>> > >       mechanism Check `heartbeat.interval.ms` here (
>> > > >>>> > >
>> > > >>>> >
>> > > >>>>
>> > >
>> >
>> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
>> > > >>>> > >       ).
>> > > >>>> > >       3. Another CDC connector for ScyllaDB employs a similar
>> > > >>>> mechanism.
>> > > >>>> > >
>> > > >>>> >
>> > > >>>>
>> > >
>> >
>> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
>> > > >>>> > >       4. For CDC based connectors, you could argue that these
>> > > >>>> connectors
>> > > >>>> > >       have been able to solve this error then why do we need
>> > > >>>> framework
>> > > >>>> > level
>> > > >>>> > >       support. But the point I am trying to make is that this
>> > > >>>> limitation
>> > > >>>> > from the
>> > > >>>> > >       framework is forcing CDC connector developers to
>> implement
>> > > >>>> > per-connector
>> > > >>>> > >       solutions/hacks(at times). And there could always be
>> more
>> > > CDC
>> > > >>>> > connectors in
>> > > >>>> > >       the pipeline forcing them to take a similar route as
>> well.
>> > > >>>> > >    4. There's also a case at times with CDC source connectors
>> > > which
>> > > >>>> are
>> > > >>>> > >    REST Api / Web Service based(Zendesk Source Connector for
>> > > >>>> example) .
>> > > >>>> > These
>> > > >>>> > >    connectors typically use timestamps from the responses as
>> > > >>>> offsets. If
>> > > >>>> > >    there's a long period of inactivity wherein the API
>> > invocations
>> > > >>>> don't
>> > > >>>> > >    return any data, then the offsets won't move and the
>> > connector
>> > > >>>> would
>> > > >>>> > keep
>> > > >>>> > >    using the same timestamp that it received from the last
>> > > non-empty
>> > > >>>> > response.
>> > > >>>> > >    If this period of inactivity keeps growing, and the API
>> > imposes
>> > > >>>> any
>> > > >>>> > limits
>> > > >>>> > >    on how far back we can go in terms of window start, then
>> this
>> > > >>>> could
>> > > >>>> > >    potentially be a problem. In this case even though the
>> > > connector
>> > > >>>> was
>> > > >>>> > caught
>> > > >>>> > >    up with all the responses, it may need to snapshot again.
>> In
>> > > >>>> this case
>> > > >>>> > >    updating offsets can easily help since all the connector
>> > needs
>> > > >>>> to do
>> > > >>>> > is to
>> > > >>>> > >    move the timestamp which would move the offset inherently.
>> > > >>>> > >
>> > > >>>> > > I still believe that this is something the framework should
>> > > support
>> > > >>>> OOB
>> > > >>>> > > irrespective of whether the connectors have been able to get
>> > > around
>> > > >>>> this
>> > > >>>> > > restriction or not.
>> > > >>>> > >
>> > > >>>> > > Lastly, about your comments here:
>> > > >>>> > >
>> > > >>>> > > I'm also not sure that it's worth preserving the current
>> > behavior
>> > > >>>> that
>> > > >>>> > >> offsets for records that have been filtered out via SMT are
>> not
>> > > >>>> > committed.
>> > > >>>> > >
>> > > >>>> > >
>> > > >>>> > > Let me know if we need a separate JIRA to track this? This
>> > somehow
>> > > >>>> didn't
>> > > >>>> > > look related to this discussion.
>> > > >>>> > >
>> > > >>>> > > Thanks!
>> > > >>>> > > Sagar.
>> > > >>>> > >
>> > > >>>> > >
>> > > >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton
>> > > >>>> <chr...@aiven.io.invalid>
>> > > >>>> > > wrote:
>> > > >>>> > >
>> > > >>>> > >> Hi Sagar,
>> > > >>>> > >>
>> > > >>>> > >> I'm sorry, I'm still not convinced that this design solves
>> the
>> > > >>>> > problem(s)
>> > > >>>> > >> it sets out to solve in the best way possible. I tried to
>> > > >>>> highlight this
>> > > >>>> > >> in
>> > > >>>> > >> my last email:
>> > > >>>> > >>
>> > > >>>> > >> > In general, it seems like we're trying to solve two
>> > completely
>> > > >>>> > different
>> > > >>>> > >> problems with this single KIP: adding framework-level
>> support
>> > for
>> > > >>>> > emitting
>> > > >>>> > >> heartbeat records for source connectors, and allowing source
>> > > >>>> connectors
>> > > >>>> > to
>> > > >>>> > >> emit offsets without also emitting source records. I don't
>> mind
>> > > >>>> > addressing
>> > > >>>> > >> the two at the same time if the result is elegant and
>> doesn't
>> > > >>>> compromise
>> > > >>>> > >> on
>> > > >>>> > >> the solution for either problem, but that doesn't seem to be
>> > the
>> > > >>>> case
>> > > >>>> > >> here.
>> > > >>>> > >> Of the two problems, could we describe one as the primary
>> and
>> > one
>> > > >>>> as the
>> > > >>>> > >> secondary? If so, we might consider dropping the secondary
>> > > problem
>> > > >>>> from
>> > > >>>> > >> this KIP and addressing it separately.
>> > > >>>> > >>
>> > > >>>> > >> If we wanted to add support for heartbeat records, we could
>> > (and
>> > > >>>> IMO
>> > > >>>> > >> should) do that without requiring connectors to implement
>> any
>> > new
>> > > >>>> > methods
>> > > >>>> > >> and only require adjustments to worker or connector
>> > > configurations
>> > > >>>> by
>> > > >>>> > >> users
>> > > >>>> > >> in order to enable that feature.
>> > > >>>> > >>
>> > > >>>> > >> If we want to add support for connectors to emit offsets
>> > without
>> > > >>>> > >> accompanying source records, we could (and IMO should) do
>> that
>> > > >>>> without
>> > > >>>> > >> requiring users to manually enable that feature by adjusting
>> > > >>>> worker or
>> > > >>>> > >> connector configurations.
>> > > >>>> > >>
>> > > >>>> > >>
>> > > >>>> > >> I'm also not sure that it's worth preserving the current
>> > behavior
>> > > >>>> that
>> > > >>>> > >> offsets for records that have been filtered out via SMT are
>> not
>> > > >>>> > committed.
>> > > >>>> > >> I can't think of a case where this would be useful and there
>> > are
>> > > >>>> > obviously
>> > > >>>> > >> plenty where it isn't. There's also a slight discrepancy in
>> how
>> > > >>>> these
>> > > >>>> > >> kinds
>> > > >>>> > >> of records are treated by the Connect runtime now; if a
>> record
>> > is
>> > > >>>> > dropped
>> > > >>>> > >> because of an SMT, then its offset isn't committed, but if
>> it's
>> > > >>>> dropped
>> > > >>>> > >> because exactly-once support is enabled and the connector
>> chose
>> > > to
>> > > >>>> abort
>> > > >>>> > >> the batch containing the record, then its offset is still
>> > > >>>> committed.
>> > > >>>> > After
>> > > >>>> > >> thinking carefully about the aborted transaction behavior,
>> we
>> > > >>>> realized
>> > > >>>> > >> that
>> > > >>>> > >> it was fine to commit the offsets for those records, and I
>> > > believe
>> > > >>>> that
>> > > >>>> > >> the
>> > > >>>> > >> same logic can be applied to any record that we're done
>> trying
>> > to
>> > > >>>> send
>> > > >>>> > to
>> > > >>>> > >> Kafka (regardless of whether it was sent correctly, dropped
>> due
>> > > to
>> > > >>>> > >> producer
>> > > >>>> > >> error, filtered via SMT, etc.).
>> > > >>>> > >>
>> > > >>>> > >> I also find the file-based source connector example a little
>> > > >>>> confusing.
>> > > >>>> > >> What about that kind of connector causes the offset for the
>> > last
>> > > >>>> record
>> > > >>>> > of
>> > > >>>> > >> a file to be treated differently? Is there anything
>> different
>> > > about
>> > > >>>> > >> filtering that record via SMT vs. dropping it altogether
>> > because
>> > > >>>> of an
>> > > >>>> > >> asynchronous producer error with "errors.tolerance" set to
>> > "all"?
>> > > >>>> And
>> > > >>>> > >> finally, how would such a connector use the design proposed
>> > here?
>> > > >>>> > >>
>> > > >>>> > >> Finally, I don't disagree that if there are other legitimate
>> > use
>> > > >>>> cases
>> > > >>>> > >> that
>> > > >>>> > >> would be helped by addressing KAFKA-3821, we should try to
>> > solve
>> > > >>>> that
>> > > >>>> > >> issue
>> > > >>>> > >> in the Kafka Connect framework instead of requiring
>> individual
>> > > >>>> > connectors
>> > > >>>> > >> to implement their own solutions. But the cognitive load
>> added
>> > by
>> > > >>>> the
>> > > >>>> > >> design proposed here, for connector developers and Connect
>> > > cluster
>> > > >>>> > >> administrators alike, costs too much to justify by pointing
>> to
>> > an
>> > > >>>> > >> already-solved problem encountered by a single group of
>> > > connectors
>> > > >>>> > (i.e.,
>> > > >>>> > >> Debezium). This is why I think it's crucial that we identify
>> > > >>>> realistic
>> > > >>>> > >> cases where this feature would actually be useful, and right
>> > > now, I
>> > > >>>> > don't
>> > > >>>> > >> think any have been provided (at least, not ones that have
>> > > already
>> > > >>>> been
>> > > >>>> > >> addressed or could be addressed with much simpler changes).
>> > > >>>> > >>
>> > > >>>> > >> Cheers,
>> > > >>>> > >>
>> > > >>>> > >> Chris
>> > > >>>> > >>
>> > > >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <
>> > sagarmeansoc...@gmail.com
>> > > >
>> > > >>>> > wrote:
>> > > >>>> > >>
>> > > >>>> > >> > Hi Chris,
>> > > >>>> > >> >
>> > > >>>> > >> > Thanks for your detailed feedback!
>> > > >>>> > >> >
>> > > >>>> > >> > nits: I have taken care of them now. Thanks for pointing
>> > those
>> > > >>>> out.
>> > > >>>> > >> >
>> > > >>>> > >> > non-nits:
>> > > >>>> > >> >
>> > > >>>> > >> > 6) It seems (based on both the KIP and discussion on
>> > > KAFKA-3821)
>> > > >>>> that
>> > > >>>> > >> the
>> > > >>>> > >> > > only use case for being able to emit offsets without
>> also
>> > > >>>> emitting
>> > > >>>> > >> source
>> > > >>>> > >> > > records that's been identified so far is for CDC source
>> > > >>>> connectors
>> > > >>>> > >> like
>> > > >>>> > >> > > Debezium.
>> > > >>>> > >> >
>> > > >>>> > >> >
>> > > >>>> > >> > I am aware of atleast one more case where the non
>> production
>> > of
>> > > >>>> > offsets
>> > > >>>> > >> > (due to non production of records ) leads to the failure
>> of
>> > > >>>> connectors
>> > > >>>> > >> when
>> > > >>>> > >> > the source purges the records of interest. This happens in
>> > File
>> > > >>>> based
>> > > >>>> > >> > source connectors  (like s3/blob storage ) in which if the
>> > last
>> > > >>>> record
>> > > >>>> > >> from
>> > > >>>> > >> > a file is fiterterd due to an SMT, then that particular
>> file
>> > is
>> > > >>>> never
>> > > >>>> > >> > committed to the source partition and eventually when the
>> > file
>> > > is
>> > > >>>> > >> deleted
>> > > >>>> > >> > from the source and the connector is restarted due to some
>> > > >>>> reason, it
>> > > >>>> > >> > fails.
>> > > >>>> > >> > Moreover, I feel the reason this support should be there
>> in
>> > the
>> > > >>>> Kafka
>> > > >>>> > >> > Connect framework is because this is a restriction of the
>> > > >>>> framework
>> > > >>>> > and
>> > > >>>> > >> > today the framework provides no support for getting around
>> > this
>> > > >>>> > >> limitation.
>> > > >>>> > >> > Every connector has it's own way of handling offsets and
>> > having
>> > > >>>> each
>> > > >>>> > >> > connector handle this restriction in its own way can make
>> it
>> > > >>>> complex.
>> > > >>>> > >> > Whether we choose to do it the way this KIP prescribes or
>> any
>> > > >>>> other
>> > > >>>> > way
>> > > >>>> > >> is
>> > > >>>> > >> > up for debate but IMHO, the framework should provide a
>> way of
>> > > >>>> > >> > getting around this limitation.
>> > > >>>> > >> >
>> > > >>>> > >> > 7. If a task produces heartbeat records and source records
>> > that
>> > > >>>> use
>> > > >>>> > the
>> > > >>>> > >> > > same source partition, which offset will ultimately be
>> > > >>>> committed?
>> > > >>>> > >> >
>> > > >>>> > >> >
>> > > >>>> > >> > The idea is to add the records returned by the
>> > > >>>> > `produceHeartbeatRecords`
>> > > >>>> > >> > to  the same `toSend` list within
>> > > >>>> `AbstractWorkerSourceTask#execute`.
>> > > >>>> > >> The
>> > > >>>> > >> > `produceHeartbeatRecords` would be invoked before we make
>> the
>> > > >>>> `poll`
>> > > >>>> > >> call.
>> > > >>>> > >> > Hence, the offsets committed would be in the same order in
>> > > which
>> > > >>>> they
>> > > >>>> > >> would
>> > > >>>> > >> > be written. Note that, the onus is on the Connector
>> > > >>>> implementation to
>> > > >>>> > >> not
>> > > >>>> > >> > return records which can lead to data loss or data going
>> out
>> > of
>> > > >>>> order.
>> > > >>>> > >> The
>> > > >>>> > >> > framework would just commit based on whatever is supplied.
>> > > Also,
>> > > >>>> > AFAIK,
>> > > >>>> > >> 2
>> > > >>>> > >> > `normal` source records can also produce the same source
>> > > >>>> partitions
>> > > >>>> > and
>> > > >>>> > >> > they are committed in the order in which they are written.
>> > > >>>> > >> >
>> > > >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns
>> a
>> > > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat
>> > topic
>> > > >>>> for a
>> > > >>>> > >> > > connector via the (connector- or worker-level)
>> > > >>>> > >> "heartbeat.records.topic"
>> > > >>>> > >> > > property. Since every constructor for the SourceRecord
>> > class
>> > > >>>> [2]
>> > > >>>> > >> > requires a
>> > > >>>> > >> > > topic to be supplied, what will happen to that topic?
>> Will
>> > it
>> > > >>>> be
>> > > >>>> > >> ignored?
>> > > >>>> > >> > > If so, I think we should look for a cleaner solution.
>> > > >>>> > >> >
>> > > >>>> > >> >
>> > > >>>> > >> > Sorry, I couldn't quite follow which topic will be
>> ignored in
>> > > >>>> this
>> > > >>>> > case.
>> > > >>>> > >> >
>> > > >>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821
>> > was
>> > > >>>> the
>> > > >>>> > >> allowing
>> > > >>>> > >> > > connectors to control the ordering of these special
>> > > >>>> "offsets-only"
>> > > >>>> > >> > > emissions and the regular source records returned from
>> > > >>>> > >> SourceTask::poll.
>> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can you
>> add
>> > > >>>> this to
>> > > >>>> > the
>> > > >>>> > >> > > rejected alternatives section along with a rationale?
>> > > >>>> > >> >
>> > > >>>> > >> >
>> > > >>>> > >> > One thing to note is that the for every connector, the
>> > > condition
>> > > >>>> to
>> > > >>>> > emit
>> > > >>>> > >> > the heartbeat record is totally up to the connector, For
>> > > >>>> example, for
>> > > >>>> > a
>> > > >>>> > >> > connector which is tracking transactions for an ordered
>> log,
>> > if
>> > > >>>> there
>> > > >>>> > >> are
>> > > >>>> > >> > open transactions, it might not need to emit heartbeat
>> > records
>> > > >>>> when
>> > > >>>> > the
>> > > >>>> > >> > timer expires while for file based connectors, if the same
>> > file
>> > > >>>> is
>> > > >>>> > being
>> > > >>>> > >> > processed again and again due to an SMT or some other
>> > reasons,
>> > > >>>> then it
>> > > >>>> > >> can
>> > > >>>> > >> > choose to emit that partition. The uber point here is that
>> > > every
>> > > >>>> > >> connector
>> > > >>>> > >> > has it's own requirements and the framework can't really
>> make
>> > > an
>> > > >>>> > >> assumption
>> > > >>>> > >> > about it. What the KIP is trying to do is to provide a
>> > > mechanism
>> > > >>>> to
>> > > >>>> > the
>> > > >>>> > >> > connector to commit new offsets. With this approach, as
>> far
>> > as
>> > > I
>> > > >>>> can
>> > > >>>> > >> think
>> > > >>>> > >> > so far, there doesn't seem to be a case of out of order
>> > > >>>> processing. If
>> > > >>>> > >> you
>> > > >>>> > >> > have other concerns/thoughts I would be happy to know
>> them.
>> > > >>>> > >> >
>> > > >>>> > >> > 10. If, sometime in the future, we wanted to add
>> > > framework-level
>> > > >>>> > support
>> > > >>>> > >> > > for sending heartbeat records that doesn't require
>> > connectors
>> > > >>>> to
>> > > >>>> > >> > implement
>> > > >>>> > >> > > any new APIs...
>> > > >>>> > >> >
>> > > >>>> > >> >
>> > > >>>> > >> > The main purpose of producing heartbeat records is to be
>> able
>> > > to
>> > > >>>> emit
>> > > >>>> > >> > offsets w/o any new records. We are using heartbeat
>> records
>> > to
>> > > >>>> solve
>> > > >>>> > the
>> > > >>>> > >> > primary concern of offsets getting stalled. The reason to
>> do
>> > > >>>> that was
>> > > >>>> > >> once
>> > > >>>> > >> > we get SourceRecords, then the rest of the code is
>> already in
>> > > >>>> place to
>> > > >>>> > >> > write it to a topic of interest and commit offsets and
>> that
>> > > >>>> seemed the
>> > > >>>> > >> most
>> > > >>>> > >> > non invasive in terms of framework level changes. If in
>> the
>> > > >>>> future we
>> > > >>>> > >> want
>> > > >>>> > >> > to do a framework-only heartbeat record support, then this
>> > > would
>> > > >>>> > create
>> > > >>>> > >> > confusion as you pointed out. Do you think the choice of
>> the
>> > > name
>> > > >>>> > >> heartbeat
>> > > >>>> > >> > records is creating confusion in this case? Maybe we can
>> call
>> > > >>>> these
>> > > >>>> > >> special
>> > > >>>> > >> > records something else (not sure what at this point) which
>> > > would
>> > > >>>> then
>> > > >>>> > >> > decouple the 2 logically and implementation wise as well?
>> > > >>>> > >> >
>> > > >>>> > >> > Thanks!
>> > > >>>> > >> > Sagar.
>> > > >>>> > >> >
>> > > >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton
>> > > >>>> <chr...@aiven.io.invalid
>> > > >>>> > >
>> > > >>>> > >> > wrote:
>> > > >>>> > >> >
>> > > >>>> > >> > > Hi Sagar,
>> > > >>>> > >> > >
>> > > >>>> > >> > > Thanks for the KIP! I have some thoughts.
>> > > >>>> > >> > >
>> > > >>>> > >> > > Nits:
>> > > >>>> > >> > >
>> > > >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira
>> ticket on
>> > > >>>> the KIP?
>> > > >>>> > >> Or
>> > > >>>> > >> > is
>> > > >>>> > >> > > there a different ticket that should be associated with
>> it?
>> > > >>>> > >> > > 2. The current state is listed as "Draft". Considering
>> it's
>> > > >>>> been
>> > > >>>> > >> brought
>> > > >>>> > >> > up
>> > > >>>> > >> > > for discussion, maybe the KIP should be updated to
>> > > >>>> "Discussion"?
>> > > >>>> > >> > > 3. Can you add a link for the discussion thread to the
>> KIP?
>> > > >>>> > >> > > 4. The KIP states that "In this process, offsets are
>> > written
>> > > at
>> > > >>>> > >> regular
>> > > >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This
>> > isn't
>> > > >>>> > strictly
>> > > >>>> > >> > > accurate since, when exactly-once support is enabled,
>> > offset
>> > > >>>> commits
>> > > >>>> > >> can
>> > > >>>> > >> > > also be performed for each record batch (which is the
>> > > default)
>> > > >>>> or
>> > > >>>> > when
>> > > >>>> > >> > > explicitly requested by the task instance (if the
>> connector
>> > > >>>> > implements
>> > > >>>> > >> > the
>> > > >>>> > >> > > API to define its own transactions and the user has
>> > > configured
>> > > >>>> it to
>> > > >>>> > >> do
>> > > >>>> > >> > > so). Maybe better to just say "Offsets are written
>> > > >>>> periodically"?
>> > > >>>> > >> > > 5. The description for the (per-connector)
>> > > >>>> "heartbeat.records.topic
>> > > >>>> > "
>> > > >>>> > >> > > property states that it is "Only applicable in
>> distributed
>> > > >>>> mode; in
>> > > >>>> > >> > > standalone mode, setting this property will have no
>> > effect".
>> > > >>>> Is this
>> > > >>>> > >> > > correct?
>> > > >>>> > >> > >
>> > > >>>> > >> > > Non-nits:
>> > > >>>> > >> > >
>> > > >>>> > >> > > 6. It seems (based on both the KIP and discussion on
>> > > >>>> KAFKA-3821)
>> > > >>>> > that
>> > > >>>> > >> the
>> > > >>>> > >> > > only use case for being able to emit offsets without
>> also
>> > > >>>> emitting
>> > > >>>> > >> source
>> > > >>>> > >> > > records that's been identified so far is for CDC source
>> > > >>>> connectors
>> > > >>>> > >> like
>> > > >>>> > >> > > Debezium. But Debezium already has support for this
>> exact
>> > > >>>> feature
>> > > >>>> > >> > (emitting
>> > > >>>> > >> > > heartbeat records that include offsets that cannot be
>> > > >>>> associated
>> > > >>>> > with
>> > > >>>> > >> > > other, "regular" source records). Why should we add this
>> > > >>>> feature to
>> > > >>>> > >> Kafka
>> > > >>>> > >> > > Connect when the problem it addresses is already solved
>> in
>> > > the
>> > > >>>> set
>> > > >>>> > >> > > connectors that (it seems) would have any need for it,
>> and
>> > > the
>> > > >>>> size
>> > > >>>> > of
>> > > >>>> > >> > that
>> > > >>>> > >> > > set is extremely small? If there are other practical use
>> > > cases
>> > > >>>> for
>> > > >>>> > >> > > connectors that would benefit from this feature, please
>> let
>> > > me
>> > > >>>> know.
>> > > >>>> > >> > >
>> > > >>>> > >> > > 7. If a task produces heartbeat records and source
>> records
>> > > >>>> that use
>> > > >>>> > >> the
>> > > >>>> > >> > > same source partition, which offset will ultimately be
>> > > >>>> committed?
>> > > >>>> > >> > >
>> > > >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method
>> returns a
>> > > >>>> > >> > > List<SourceRecord>, and users can control the heartbeat
>> > topic
>> > > >>>> for a
>> > > >>>> > >> > > connector via the (connector- or worker-level)
>> > > >>>> > >> "heartbeat.records.topic"
>> > > >>>> > >> > > property. Since every constructor for the SourceRecord
>> > class
>> > > >>>> [2]
>> > > >>>> > >> > requires a
>> > > >>>> > >> > > topic to be supplied, what will happen to that topic?
>> Will
>> > it
>> > > >>>> be
>> > > >>>> > >> ignored?
>> > > >>>> > >> > > If so, I think we should look for a cleaner solution.
>> > > >>>> > >> > >
>> > > >>>> > >> > > 9. A large concern raised in the discussion for
>> KAFKA-3821
>> > > was
>> > > >>>> the
>> > > >>>> > >> > allowing
>> > > >>>> > >> > > connectors to control the ordering of these special
>> > > >>>> "offsets-only"
>> > > >>>> > >> > > emissions and the regular source records returned from
>> > > >>>> > >> SourceTask::poll.
>> > > >>>> > >> > > Are we choosing to ignore that concern? If so, can you
>> add
>> > > >>>> this to
>> > > >>>> > the
>> > > >>>> > >> > > rejected alternatives section along with a rationale?
>> > > >>>> > >> > >
>> > > >>>> > >> > > 10. If, sometime in the future, we wanted to add
>> > > >>>> framework-level
>> > > >>>> > >> support
>> > > >>>> > >> > > for sending heartbeat records that doesn't require
>> > connectors
>> > > >>>> to
>> > > >>>> > >> > implement
>> > > >>>> > >> > > any new APIs (e.g.,
>> SourceTask::produceHeartbeatRecords), a
>> > > >>>> lot of
>> > > >>>> > >> this
>> > > >>>> > >> > > would paint us into a corner design-wise. We'd have to
>> > think
>> > > >>>> > carefully
>> > > >>>> > >> > > about which property names would be used, how to account
>> > for
>> > > >>>> > >> connectors
>> > > >>>> > >> > > that have already implemented the
>> > > >>>> > SourceTask::produceHeartbeatRecords
>> > > >>>> > >> > > method, etc. In general, it seems like we're trying to
>> > solve
>> > > >>>> two
>> > > >>>> > >> > completely
>> > > >>>> > >> > > different problems with this single KIP: adding
>> > > framework-level
>> > > >>>> > >> support
>> > > >>>> > >> > for
>> > > >>>> > >> > > emitting heartbeat records for source connectors, and
>> > > allowing
>> > > >>>> > source
>> > > >>>> > >> > > connectors to emit offsets without also emitting source
>> > > >>>> records. I
>> > > >>>> > >> don't
>> > > >>>> > >> > > mind addressing the two at the same time if the result
>> is
>> > > >>>> elegant
>> > > >>>> > and
>> > > >>>> > >> > > doesn't compromise on the solution for either problem,
>> but
>> > > that
>> > > >>>> > >> doesn't
>> > > >>>> > >> > > seem to be the case here. Of the two problems, could we
>> > > >>>> describe one
>> > > >>>> > >> as
>> > > >>>> > >> > the
>> > > >>>> > >> > > primary and one as the secondary? If so, we might
>> consider
>> > > >>>> dropping
>> > > >>>> > >> the
>> > > >>>> > >> > > secondary problm from this KIP and addressing it
>> > separately.
>> > > >>>> > >> > >
>> > > >>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
>> > > >>>> > >> > > [2] -
>> > > >>>> > >> > >
>> > > >>>> > >> > >
>> > > >>>> > >> >
>> > > >>>> > >>
>> > > >>>> >
>> > > >>>>
>> > >
>> >
>> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
>> > > >>>> > >> > >
>> > > >>>> > >> > > Cheers,
>> > > >>>> > >> > >
>> > > >>>> > >> > > Chris
>> > > >>>> > >> > >
>> > > >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <
>> > > >>>> sagarmeansoc...@gmail.com>
>> > > >>>> > >> > wrote:
>> > > >>>> > >> > >
>> > > >>>> > >> > > > Hi John,
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > Thanks for taking. look at the KIP!
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > The point about stream time not advancing in case of
>> > > >>>> infrequent
>> > > >>>> > >> updates
>> > > >>>> > >> > > is
>> > > >>>> > >> > > > an interesting one. I can imagine if the upstream
>> > producer
>> > > >>>> to a
>> > > >>>> > >> Kafka
>> > > >>>> > >> > > > Streams application is a Source Connector which isn't
>> > > sending
>> > > >>>> > >> records
>> > > >>>> > >> > > > frequently(due to the nature of the data ingestion for
>> > > >>>> example),
>> > > >>>> > >> then
>> > > >>>> > >> > the
>> > > >>>> > >> > > > downstream stream processing can land into the issues
>> you
>> > > >>>> > described
>> > > >>>> > >> > > above.
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > Which also brings me to the second point you made
>> about
>> > how
>> > > >>>> this
>> > > >>>> > >> would
>> > > >>>> > >> > be
>> > > >>>> > >> > > > used by downstream consumers. IIUC, you are referring
>> to
>> > > the
>> > > >>>> > >> consumers
>> > > >>>> > >> > of
>> > > >>>> > >> > > > the newly added topic i.e the heartbeat topic. In my
>> > mind,
>> > > >>>> the
>> > > >>>> > >> > heartbeat
>> > > >>>> > >> > > > topic is an internal topic (similar to
>> > > offsets/config/status
>> > > >>>> topic
>> > > >>>> > >> in
>> > > >>>> > >> > > > connect), the main purpose of which is to trick the
>> > > >>>> framework to
>> > > >>>> > >> > produce
>> > > >>>> > >> > > > records to the offsets topic and advance the offsets.
>> > Since
>> > > >>>> every
>> > > >>>> > >> > > connector
>> > > >>>> > >> > > > could have a different definition of offsets(LSN,
>> > BinLogID
>> > > >>>> etc for
>> > > >>>> > >> > > > example), that logic to determine what the heartbeat
>> > > records
>> > > >>>> > should
>> > > >>>> > >> be
>> > > >>>> > >> > > > would have to reside in the actual connector.
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > Now that I think of it, it could very well be
>> consumed by
>> > > >>>> > downstream
>> > > >>>> > >> > > > consumers/ Streams or Flink Applications and be
>> further
>> > > used
>> > > >>>> for
>> > > >>>> > >> some
>> > > >>>> > >> > > > decision making. A very crude example could be let's
>> say
>> > if
>> > > >>>> the
>> > > >>>> > >> > heartbeat
>> > > >>>> > >> > > > records sent to the new heartbeat topic include
>> > timestamps,
>> > > >>>> then
>> > > >>>> > the
>> > > >>>> > >> > > > downstream streams application can use that timestamp
>> to
>> > > >>>> close any
>> > > >>>> > >> time
>> > > >>>> > >> > > > windows. Having said that, it still appears to me that
>> > it's
>> > > >>>> > outside
>> > > >>>> > >> the
>> > > >>>> > >> > > > scope of the Connect framework and is something which
>> is
>> > > >>>> difficult
>> > > >>>> > >> to
>> > > >>>> > >> > > > generalise because of the variety of Sources and the
>> > > >>>> definitions
>> > > >>>> > of
>> > > >>>> > >> > > > offsets.
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > But, I would still be more than happy to add this
>> example
>> > > if
>> > > >>>> you
>> > > >>>> > >> think
>> > > >>>> > >> > it
>> > > >>>> > >> > > > can be useful in getting a better understanding of the
>> > idea
>> > > >>>> and
>> > > >>>> > also
>> > > >>>> > >> > its
>> > > >>>> > >> > > > utility beyond connect. Please let me know!
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > Thanks!
>> > > >>>> > >> > > > Sagar.
>> > > >>>> > >> > > >
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <
>> > > >>>> vvcep...@apache.org
>> > > >>>> > >
>> > > >>>> > >> > > wrote:
>> > > >>>> > >> > > >
>> > > >>>> > >> > > > > Thanks for the KIP, Sagar!
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > > > At first glance, this seems like a very useful
>> feature.
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > > > A common pain point in Streams is when upstream
>> > producers
>> > > >>>> don't
>> > > >>>> > >> send
>> > > >>>> > >> > > > > regular updates and stream time cannot advance. This
>> > > causes
>> > > >>>> > >> > > > > stream-time-driven operations to appear to hang,
>> like
>> > > time
>> > > >>>> > windows
>> > > >>>> > >> > not
>> > > >>>> > >> > > > > closing, suppressions not firing, etc.
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > > > From your KIP, I have a good idea of how the feature
>> > > would
>> > > >>>> be
>> > > >>>> > >> > > integrated
>> > > >>>> > >> > > > > into connect, and it sounds good to me. I don't
>> quite
>> > see
>> > > >>>> how
>> > > >>>> > >> > > downstream
>> > > >>>> > >> > > > > clients, such as a downstream Streams or Flink
>> > > >>>> application, or
>> > > >>>> > >> users
>> > > >>>> > >> > of
>> > > >>>> > >> > > > the
>> > > >>>> > >> > > > > Consumer would make use of this feature. Could you
>> add
>> > > some
>> > > >>>> > >> examples
>> > > >>>> > >> > of
>> > > >>>> > >> > > > > that nature?
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > > > Thank you,
>> > > >>>> > >> > > > > -John
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
>> > > >>>> > >> > > > > > Hi All,
>> > > >>>> > >> > > > > >
>> > > >>>> > >> > > > > > Bumping the thread again.
>> > > >>>> > >> > > > > >
>> > > >>>> > >> > > > > > Sagar.
>> > > >>>> > >> > > > > >
>> > > >>>> > >> > > > > >
>> > > >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
>> > > >>>> > >> sagarmeansoc...@gmail.com>
>> > > >>>> > >> > > > wrote:
>> > > >>>> > >> > > > > >
>> > > >>>> > >> > > > > >> Hi All,
>> > > >>>> > >> > > > > >>
>> > > >>>> > >> > > > > >> Bumping this discussion thread again.
>> > > >>>> > >> > > > > >>
>> > > >>>> > >> > > > > >> Thanks!
>> > > >>>> > >> > > > > >> Sagar.
>> > > >>>> > >> > > > > >>
>> > > >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
>> > > >>>> > >> sagarmeansoc...@gmail.com>
>> > > >>>> > >> > > > wrote:
>> > > >>>> > >> > > > > >>
>> > > >>>> > >> > > > > >>> Hi All,
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > > >>> I wanted to create a discussion thread for
>> KIP-910:
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > >
>> > > >>>> > >> > >
>> > > >>>> > >> >
>> > > >>>> > >>
>> > > >>>> >
>> > > >>>>
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > > >>> Thanks!
>> > > >>>> > >> > > > > >>> Sagar.
>> > > >>>> > >> > > > > >>>
>> > > >>>> > >> > > > > >>
>> > > >>>> > >> > > > >
>> > > >>>> > >> > > >
>> > > >>>> > >> > >
>> > > >>>> > >> >
>> > > >>>> > >>
>> > > >>>> > >
>> > > >>>> >
>> > > >>>>
>> > > >>>
>> > >
>> >
>>
>

Reply via email to