Hi All,

I have created this PR: https://github.com/apache/kafka/pull/13899 which
implements the approach outlined in the latest version of the KIP. I
thought I could use this to validate the approach based on my understanding
while the KIP itself gets reviewed. I can always change the implementation
once we move to a final decision on the KIP.

Thanks!
Sagar.


On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com> wrote:

> Hey All,
>
> Bumping this discussion thread again to see how the modified KIP looks
> like.
>
> Thanks!
> Sagar.
>
> On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>
>> Hi,
>>
>> Bumping this thread again for further reviews.
>>
>> Thanks!
>> Sagar.
>>
>> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> Thanks for the comments/reviews. I have updated the KIP
>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>> with a newer approach which shelves the need for an explicit topic.
>>>
>>> Please review again and let me know what you think.
>>>
>>> Thanks!
>>> Sagar.
>>>
>>>
>>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com> wrote:
>>>
>>>> Hi Sagar,
>>>>
>>>> Thanks for the KIP! I have a few questions and comments:
>>>>
>>>> 1) I agree with Chris' point about the separation of a connector
>>>> heartbeat
>>>> mechanism and allowing source connectors to generate offsets without
>>>> producing data. What is the purpose of the heartbeat topic here and are
>>>> there any concrete use cases for downstream consumers on this topic? Why
>>>> can't we instead simply introduce a mechanism to retrieve a list of
>>>> source
>>>> partition / source offset pairs from the source tasks?
>>>>
>>>> 2) With the currently described mechanism, the new
>>>> "SourceTask::produceHeartbeatRecords" method returns a
>>>> "List<SourceRecord>"
>>>> - what happens with the topic in each of these source records? Chris
>>>> pointed this out above, but it doesn't seem to have been addressed? The
>>>> "SourceRecord" class also has a bunch of other fields which will be
>>>> irrelevant here (partition, key / value schema, key / value data,
>>>> timestamp, headers). In fact, it seems like only the source partition
>>>> and
>>>> source offset are relevant here, so we should either introduce a new
>>>> abstraction or simply use a data structure like a mapping from source
>>>> partitions to source offsets (adds to the above point)?
>>>>
>>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is
>>>> needed? What are the downsides of
>>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop
>>>> (similar to the existing "SourceTask::poll" method)? Is this only to
>>>> prevent the generation of a lot of offset records? Since Connect's
>>>> offsets
>>>> topics are log compacted (and source partitions are used as keys for
>>>> each
>>>> source offset), I'm not sure if such concerns are valid and such a
>>>> heartbeat timer / interval mechanism is required?
>>>>
>>>> 4) The first couple of rejected alternatives state that the use of a
>>>> null
>>>> topic / key / value are preferably avoided - but the current proposal
>>>> would
>>>> also likely require connectors to use such workarounds (null topic when
>>>> the
>>>> heartbeat topic is configured at a worker level and always for the key /
>>>> value)?
>>>>
>>>> 5) The third rejected alternative talks about subclassing the
>>>> "SourceRecord" class - this presumably means allowing connectors to pass
>>>> special offset only records via the existing poll mechanism? Why was
>>>> this
>>>> considered a more invasive option? Was it because of the backward
>>>> compatibility issues that would be introduced for plugins using the new
>>>> public API class that still need to be deployed onto older Connect
>>>> workers?
>>>>
>>>> Thanks,
>>>> Yash
>>>>
>>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com>
>>>> wrote:
>>>>
>>>> > One thing I forgot to mention in my previous email was that the
>>>> reason I
>>>> > chose to include the opt-in behaviour via configs was that the users
>>>> of the
>>>> > connector know their workload patterns. If the workload is such that
>>>> the
>>>> >  connector would receive regular valid updates then there’s ideally
>>>> no need
>>>> > for moving offsets since it would update automatically.
>>>> >
>>>> > This way they aren’t forced to use this feature and can use it only
>>>> when
>>>> > the workload is expected to be batchy or not frequent.
>>>> >
>>>> > Thanks!
>>>> > Sagar.
>>>> >
>>>> >
>>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com>
>>>> wrote:
>>>> >
>>>> > > Hi Chris,
>>>> > >
>>>> > > Thanks for following up on the response. Sharing my thoughts
>>>> further:
>>>> > >
>>>> > > If we want to add support for connectors to emit offsets without
>>>> > >> accompanying source records, we could (and IMO should) do that
>>>> without
>>>> > >> requiring users to manually enable that feature by adjusting
>>>> worker or
>>>> > >> connector configurations.
>>>> > >
>>>> > >
>>>> > > With the current KIP design, I have tried to implement this in an
>>>> opt-in
>>>> > > manner via configs. I guess what you are trying to say is that this
>>>> > doesn't
>>>> > > need a config of it's own and instead could be part of the poll ->
>>>> > > transform etc -> produce -> commit cycle. That way, the users don't
>>>> need
>>>> > to
>>>> > > set any config and if the connector supports moving offsets w/o
>>>> producing
>>>> > > SourceRecords, it should happen automatically. Is that correct? If
>>>> that
>>>> > > is the concern, then I can think of not exposing a config and try
>>>> to make
>>>> > > this process automatically. That should ease the load on connector
>>>> users,
>>>> > > but your point about cognitive load on Connector developers, I am
>>>> still
>>>> > not
>>>> > > sure how to address that. The offsets are privy to a connector and
>>>> the
>>>> > > framework at best can provide hooks to the tasks to update their
>>>> offsets.
>>>> > > Connector developers would still have to consider all cases before
>>>> > updating
>>>> > > offsets.  And if I ignore the heartbeat topic and heartbeat
>>>> interval ms
>>>> > > configs, then what the KIP proposes currently isn't much different
>>>> in
>>>> > that
>>>> > > regard. Just that it produces a List of SourceRecord which can be
>>>> changed
>>>> > > to a Map of SourcePartition and their offsets if you think that
>>>> would
>>>> > > simplify things. Are there other cases in your mind which need
>>>> > addressing?
>>>> > >
>>>> > > Here's my take on the usecases:
>>>> > >
>>>> > >    1. Regarding the example about SMTs with Object Storage based
>>>> > >    connectors, it was one of the scenarios identified. We have some
>>>> > connectors
>>>> > >    that rely on the offsets topic to check if the next batch of
>>>> files
>>>> > should
>>>> > >    be processed and because of filtering of the last record from the
>>>> > files,
>>>> > >    the eof supposedly is  never reached and the connector can't
>>>> commit
>>>> > offsets
>>>> > >    for that source partition(file). If there was a mechanism to
>>>> update
>>>> > offsets
>>>> > >    for such a source file, then with some moderately complex state
>>>> > tracking,
>>>> > >    the connector can mark that file as processed and proceed.
>>>> > >    2. There's another use case with the same class of connectors
>>>> where if
>>>> > >    a file is malformed, then the connector couldn't produce any
>>>> offsets
>>>> > >    because the file couldn't get processed completely. To handle
>>>> such
>>>> > cases,
>>>> > >    the connector developers have introduced a dev/null sort of topic
>>>> > where
>>>> > >    they produce a record to this corrupted file topic and move the
>>>> offset
>>>> > >    somehow. This topic ideally isn't needed and with a mechanism to
>>>> > update
>>>> > >    offsets would have helped in this case as well.
>>>> > >    3. Coming to CDC based connectors,
>>>> > >       1. We had a similar issue with Oracle CDC source connector and
>>>> > >       needed to employ the same heartbeat mechanism to get around
>>>> it.
>>>> > >       2. MongoDB CDC source Connector  has employed the same
>>>> heartbeat
>>>> > >       mechanism Check `heartbeat.interval.ms` here (
>>>> > >
>>>> >
>>>> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/
>>>> > >       ).
>>>> > >       3. Another CDC connector for ScyllaDB employs a similar
>>>> mechanism.
>>>> > >
>>>> >
>>>> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat
>>>> > >       4. For CDC based connectors, you could argue that these
>>>> connectors
>>>> > >       have been able to solve this error then why do we need
>>>> framework
>>>> > level
>>>> > >       support. But the point I am trying to make is that this
>>>> limitation
>>>> > from the
>>>> > >       framework is forcing CDC connector developers to implement
>>>> > per-connector
>>>> > >       solutions/hacks(at times). And there could always be more CDC
>>>> > connectors in
>>>> > >       the pipeline forcing them to take a similar route as well.
>>>> > >    4. There's also a case at times with CDC source connectors which
>>>> are
>>>> > >    REST Api / Web Service based(Zendesk Source Connector for
>>>> example) .
>>>> > These
>>>> > >    connectors typically use timestamps from the responses as
>>>> offsets. If
>>>> > >    there's a long period of inactivity wherein the API invocations
>>>> don't
>>>> > >    return any data, then the offsets won't move and the connector
>>>> would
>>>> > keep
>>>> > >    using the same timestamp that it received from the last non-empty
>>>> > response.
>>>> > >    If this period of inactivity keeps growing, and the API imposes
>>>> any
>>>> > limits
>>>> > >    on how far back we can go in terms of window start, then this
>>>> could
>>>> > >    potentially be a problem. In this case even though the connector
>>>> was
>>>> > caught
>>>> > >    up with all the responses, it may need to snapshot again. In
>>>> this case
>>>> > >    updating offsets can easily help since all the connector needs
>>>> to do
>>>> > is to
>>>> > >    move the timestamp which would move the offset inherently.
>>>> > >
>>>> > > I still believe that this is something the framework should support
>>>> OOB
>>>> > > irrespective of whether the connectors have been able to get around
>>>> this
>>>> > > restriction or not.
>>>> > >
>>>> > > Lastly, about your comments here:
>>>> > >
>>>> > > I'm also not sure that it's worth preserving the current behavior
>>>> that
>>>> > >> offsets for records that have been filtered out via SMT are not
>>>> > committed.
>>>> > >
>>>> > >
>>>> > > Let me know if we need a separate JIRA to track this? This somehow
>>>> didn't
>>>> > > look related to this discussion.
>>>> > >
>>>> > > Thanks!
>>>> > > Sagar.
>>>> > >
>>>> > >
>>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton
>>>> <chr...@aiven.io.invalid>
>>>> > > wrote:
>>>> > >
>>>> > >> Hi Sagar,
>>>> > >>
>>>> > >> I'm sorry, I'm still not convinced that this design solves the
>>>> > problem(s)
>>>> > >> it sets out to solve in the best way possible. I tried to
>>>> highlight this
>>>> > >> in
>>>> > >> my last email:
>>>> > >>
>>>> > >> > In general, it seems like we're trying to solve two completely
>>>> > different
>>>> > >> problems with this single KIP: adding framework-level support for
>>>> > emitting
>>>> > >> heartbeat records for source connectors, and allowing source
>>>> connectors
>>>> > to
>>>> > >> emit offsets without also emitting source records. I don't mind
>>>> > addressing
>>>> > >> the two at the same time if the result is elegant and doesn't
>>>> compromise
>>>> > >> on
>>>> > >> the solution for either problem, but that doesn't seem to be the
>>>> case
>>>> > >> here.
>>>> > >> Of the two problems, could we describe one as the primary and one
>>>> as the
>>>> > >> secondary? If so, we might consider dropping the secondary problem
>>>> from
>>>> > >> this KIP and addressing it separately.
>>>> > >>
>>>> > >> If we wanted to add support for heartbeat records, we could (and
>>>> IMO
>>>> > >> should) do that without requiring connectors to implement any new
>>>> > methods
>>>> > >> and only require adjustments to worker or connector configurations
>>>> by
>>>> > >> users
>>>> > >> in order to enable that feature.
>>>> > >>
>>>> > >> If we want to add support for connectors to emit offsets without
>>>> > >> accompanying source records, we could (and IMO should) do that
>>>> without
>>>> > >> requiring users to manually enable that feature by adjusting
>>>> worker or
>>>> > >> connector configurations.
>>>> > >>
>>>> > >>
>>>> > >> I'm also not sure that it's worth preserving the current behavior
>>>> that
>>>> > >> offsets for records that have been filtered out via SMT are not
>>>> > committed.
>>>> > >> I can't think of a case where this would be useful and there are
>>>> > obviously
>>>> > >> plenty where it isn't. There's also a slight discrepancy in how
>>>> these
>>>> > >> kinds
>>>> > >> of records are treated by the Connect runtime now; if a record is
>>>> > dropped
>>>> > >> because of an SMT, then its offset isn't committed, but if it's
>>>> dropped
>>>> > >> because exactly-once support is enabled and the connector chose to
>>>> abort
>>>> > >> the batch containing the record, then its offset is still
>>>> committed.
>>>> > After
>>>> > >> thinking carefully about the aborted transaction behavior, we
>>>> realized
>>>> > >> that
>>>> > >> it was fine to commit the offsets for those records, and I believe
>>>> that
>>>> > >> the
>>>> > >> same logic can be applied to any record that we're done trying to
>>>> send
>>>> > to
>>>> > >> Kafka (regardless of whether it was sent correctly, dropped due to
>>>> > >> producer
>>>> > >> error, filtered via SMT, etc.).
>>>> > >>
>>>> > >> I also find the file-based source connector example a little
>>>> confusing.
>>>> > >> What about that kind of connector causes the offset for the last
>>>> record
>>>> > of
>>>> > >> a file to be treated differently? Is there anything different about
>>>> > >> filtering that record via SMT vs. dropping it altogether because
>>>> of an
>>>> > >> asynchronous producer error with "errors.tolerance" set to "all"?
>>>> And
>>>> > >> finally, how would such a connector use the design proposed here?
>>>> > >>
>>>> > >> Finally, I don't disagree that if there are other legitimate use
>>>> cases
>>>> > >> that
>>>> > >> would be helped by addressing KAFKA-3821, we should try to solve
>>>> that
>>>> > >> issue
>>>> > >> in the Kafka Connect framework instead of requiring individual
>>>> > connectors
>>>> > >> to implement their own solutions. But the cognitive load added by
>>>> the
>>>> > >> design proposed here, for connector developers and Connect cluster
>>>> > >> administrators alike, costs too much to justify by pointing to an
>>>> > >> already-solved problem encountered by a single group of connectors
>>>> > (i.e.,
>>>> > >> Debezium). This is why I think it's crucial that we identify
>>>> realistic
>>>> > >> cases where this feature would actually be useful, and right now, I
>>>> > don't
>>>> > >> think any have been provided (at least, not ones that have already
>>>> been
>>>> > >> addressed or could be addressed with much simpler changes).
>>>> > >>
>>>> > >> Cheers,
>>>> > >>
>>>> > >> Chris
>>>> > >>
>>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com>
>>>> > wrote:
>>>> > >>
>>>> > >> > Hi Chris,
>>>> > >> >
>>>> > >> > Thanks for your detailed feedback!
>>>> > >> >
>>>> > >> > nits: I have taken care of them now. Thanks for pointing those
>>>> out.
>>>> > >> >
>>>> > >> > non-nits:
>>>> > >> >
>>>> > >> > 6) It seems (based on both the KIP and discussion on KAFKA-3821)
>>>> that
>>>> > >> the
>>>> > >> > > only use case for being able to emit offsets without also
>>>> emitting
>>>> > >> source
>>>> > >> > > records that's been identified so far is for CDC source
>>>> connectors
>>>> > >> like
>>>> > >> > > Debezium.
>>>> > >> >
>>>> > >> >
>>>> > >> > I am aware of atleast one more case where the non production of
>>>> > offsets
>>>> > >> > (due to non production of records ) leads to the failure of
>>>> connectors
>>>> > >> when
>>>> > >> > the source purges the records of interest. This happens in File
>>>> based
>>>> > >> > source connectors  (like s3/blob storage ) in which if the last
>>>> record
>>>> > >> from
>>>> > >> > a file is fiterterd due to an SMT, then that particular file is
>>>> never
>>>> > >> > committed to the source partition and eventually when the file is
>>>> > >> deleted
>>>> > >> > from the source and the connector is restarted due to some
>>>> reason, it
>>>> > >> > fails.
>>>> > >> > Moreover, I feel the reason this support should be there in the
>>>> Kafka
>>>> > >> > Connect framework is because this is a restriction of the
>>>> framework
>>>> > and
>>>> > >> > today the framework provides no support for getting around this
>>>> > >> limitation.
>>>> > >> > Every connector has it's own way of handling offsets and having
>>>> each
>>>> > >> > connector handle this restriction in its own way can make it
>>>> complex.
>>>> > >> > Whether we choose to do it the way this KIP prescribes or any
>>>> other
>>>> > way
>>>> > >> is
>>>> > >> > up for debate but IMHO, the framework should provide a way of
>>>> > >> > getting around this limitation.
>>>> > >> >
>>>> > >> > 7. If a task produces heartbeat records and source records that
>>>> use
>>>> > the
>>>> > >> > > same source partition, which offset will ultimately be
>>>> committed?
>>>> > >> >
>>>> > >> >
>>>> > >> > The idea is to add the records returned by the
>>>> > `produceHeartbeatRecords`
>>>> > >> > to  the same `toSend` list within
>>>> `AbstractWorkerSourceTask#execute`.
>>>> > >> The
>>>> > >> > `produceHeartbeatRecords` would be invoked before we make the
>>>> `poll`
>>>> > >> call.
>>>> > >> > Hence, the offsets committed would be in the same order in which
>>>> they
>>>> > >> would
>>>> > >> > be written. Note that, the onus is on the Connector
>>>> implementation to
>>>> > >> not
>>>> > >> > return records which can lead to data loss or data going out of
>>>> order.
>>>> > >> The
>>>> > >> > framework would just commit based on whatever is supplied. Also,
>>>> > AFAIK,
>>>> > >> 2
>>>> > >> > `normal` source records can also produce the same source
>>>> partitions
>>>> > and
>>>> > >> > they are committed in the order in which they are written.
>>>> > >> >
>>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns a
>>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic
>>>> for a
>>>> > >> > > connector via the (connector- or worker-level)
>>>> > >> "heartbeat.records.topic"
>>>> > >> > > property. Since every constructor for the SourceRecord class
>>>> [2]
>>>> > >> > requires a
>>>> > >> > > topic to be supplied, what will happen to that topic? Will it
>>>> be
>>>> > >> ignored?
>>>> > >> > > If so, I think we should look for a cleaner solution.
>>>> > >> >
>>>> > >> >
>>>> > >> > Sorry, I couldn't quite follow which topic will be ignored in
>>>> this
>>>> > case.
>>>> > >> >
>>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821 was
>>>> the
>>>> > >> allowing
>>>> > >> > > connectors to control the ordering of these special
>>>> "offsets-only"
>>>> > >> > > emissions and the regular source records returned from
>>>> > >> SourceTask::poll.
>>>> > >> > > Are we choosing to ignore that concern? If so, can you add
>>>> this to
>>>> > the
>>>> > >> > > rejected alternatives section along with a rationale?
>>>> > >> >
>>>> > >> >
>>>> > >> > One thing to note is that the for every connector, the condition
>>>> to
>>>> > emit
>>>> > >> > the heartbeat record is totally up to the connector, For
>>>> example, for
>>>> > a
>>>> > >> > connector which is tracking transactions for an ordered log, if
>>>> there
>>>> > >> are
>>>> > >> > open transactions, it might not need to emit heartbeat records
>>>> when
>>>> > the
>>>> > >> > timer expires while for file based connectors, if the same file
>>>> is
>>>> > being
>>>> > >> > processed again and again due to an SMT or some other reasons,
>>>> then it
>>>> > >> can
>>>> > >> > choose to emit that partition. The uber point here is that every
>>>> > >> connector
>>>> > >> > has it's own requirements and the framework can't really make an
>>>> > >> assumption
>>>> > >> > about it. What the KIP is trying to do is to provide a mechanism
>>>> to
>>>> > the
>>>> > >> > connector to commit new offsets. With this approach, as far as I
>>>> can
>>>> > >> think
>>>> > >> > so far, there doesn't seem to be a case of out of order
>>>> processing. If
>>>> > >> you
>>>> > >> > have other concerns/thoughts I would be happy to know them.
>>>> > >> >
>>>> > >> > 10. If, sometime in the future, we wanted to add framework-level
>>>> > support
>>>> > >> > > for sending heartbeat records that doesn't require connectors
>>>> to
>>>> > >> > implement
>>>> > >> > > any new APIs...
>>>> > >> >
>>>> > >> >
>>>> > >> > The main purpose of producing heartbeat records is to be able to
>>>> emit
>>>> > >> > offsets w/o any new records. We are using heartbeat records to
>>>> solve
>>>> > the
>>>> > >> > primary concern of offsets getting stalled. The reason to do
>>>> that was
>>>> > >> once
>>>> > >> > we get SourceRecords, then the rest of the code is already in
>>>> place to
>>>> > >> > write it to a topic of interest and commit offsets and that
>>>> seemed the
>>>> > >> most
>>>> > >> > non invasive in terms of framework level changes. If in the
>>>> future we
>>>> > >> want
>>>> > >> > to do a framework-only heartbeat record support, then this would
>>>> > create
>>>> > >> > confusion as you pointed out. Do you think the choice of the name
>>>> > >> heartbeat
>>>> > >> > records is creating confusion in this case? Maybe we can call
>>>> these
>>>> > >> special
>>>> > >> > records something else (not sure what at this point) which would
>>>> then
>>>> > >> > decouple the 2 logically and implementation wise as well?
>>>> > >> >
>>>> > >> > Thanks!
>>>> > >> > Sagar.
>>>> > >> >
>>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton
>>>> <chr...@aiven.io.invalid
>>>> > >
>>>> > >> > wrote:
>>>> > >> >
>>>> > >> > > Hi Sagar,
>>>> > >> > >
>>>> > >> > > Thanks for the KIP! I have some thoughts.
>>>> > >> > >
>>>> > >> > > Nits:
>>>> > >> > >
>>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on
>>>> the KIP?
>>>> > >> Or
>>>> > >> > is
>>>> > >> > > there a different ticket that should be associated with it?
>>>> > >> > > 2. The current state is listed as "Draft". Considering it's
>>>> been
>>>> > >> brought
>>>> > >> > up
>>>> > >> > > for discussion, maybe the KIP should be updated to
>>>> "Discussion"?
>>>> > >> > > 3. Can you add a link for the discussion thread to the KIP?
>>>> > >> > > 4. The KIP states that "In this process, offsets are written at
>>>> > >> regular
>>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This isn't
>>>> > strictly
>>>> > >> > > accurate since, when exactly-once support is enabled, offset
>>>> commits
>>>> > >> can
>>>> > >> > > also be performed for each record batch (which is the default)
>>>> or
>>>> > when
>>>> > >> > > explicitly requested by the task instance (if the connector
>>>> > implements
>>>> > >> > the
>>>> > >> > > API to define its own transactions and the user has configured
>>>> it to
>>>> > >> do
>>>> > >> > > so). Maybe better to just say "Offsets are written
>>>> periodically"?
>>>> > >> > > 5. The description for the (per-connector)
>>>> "heartbeat.records.topic
>>>> > "
>>>> > >> > > property states that it is "Only applicable in distributed
>>>> mode; in
>>>> > >> > > standalone mode, setting this property will have no effect".
>>>> Is this
>>>> > >> > > correct?
>>>> > >> > >
>>>> > >> > > Non-nits:
>>>> > >> > >
>>>> > >> > > 6. It seems (based on both the KIP and discussion on
>>>> KAFKA-3821)
>>>> > that
>>>> > >> the
>>>> > >> > > only use case for being able to emit offsets without also
>>>> emitting
>>>> > >> source
>>>> > >> > > records that's been identified so far is for CDC source
>>>> connectors
>>>> > >> like
>>>> > >> > > Debezium. But Debezium already has support for this exact
>>>> feature
>>>> > >> > (emitting
>>>> > >> > > heartbeat records that include offsets that cannot be
>>>> associated
>>>> > with
>>>> > >> > > other, "regular" source records). Why should we add this
>>>> feature to
>>>> > >> Kafka
>>>> > >> > > Connect when the problem it addresses is already solved in the
>>>> set
>>>> > >> > > connectors that (it seems) would have any need for it, and the
>>>> size
>>>> > of
>>>> > >> > that
>>>> > >> > > set is extremely small? If there are other practical use cases
>>>> for
>>>> > >> > > connectors that would benefit from this feature, please let me
>>>> know.
>>>> > >> > >
>>>> > >> > > 7. If a task produces heartbeat records and source records
>>>> that use
>>>> > >> the
>>>> > >> > > same source partition, which offset will ultimately be
>>>> committed?
>>>> > >> > >
>>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method returns a
>>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic
>>>> for a
>>>> > >> > > connector via the (connector- or worker-level)
>>>> > >> "heartbeat.records.topic"
>>>> > >> > > property. Since every constructor for the SourceRecord class
>>>> [2]
>>>> > >> > requires a
>>>> > >> > > topic to be supplied, what will happen to that topic? Will it
>>>> be
>>>> > >> ignored?
>>>> > >> > > If so, I think we should look for a cleaner solution.
>>>> > >> > >
>>>> > >> > > 9. A large concern raised in the discussion for KAFKA-3821 was
>>>> the
>>>> > >> > allowing
>>>> > >> > > connectors to control the ordering of these special
>>>> "offsets-only"
>>>> > >> > > emissions and the regular source records returned from
>>>> > >> SourceTask::poll.
>>>> > >> > > Are we choosing to ignore that concern? If so, can you add
>>>> this to
>>>> > the
>>>> > >> > > rejected alternatives section along with a rationale?
>>>> > >> > >
>>>> > >> > > 10. If, sometime in the future, we wanted to add
>>>> framework-level
>>>> > >> support
>>>> > >> > > for sending heartbeat records that doesn't require connectors
>>>> to
>>>> > >> > implement
>>>> > >> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a
>>>> lot of
>>>> > >> this
>>>> > >> > > would paint us into a corner design-wise. We'd have to think
>>>> > carefully
>>>> > >> > > about which property names would be used, how to account for
>>>> > >> connectors
>>>> > >> > > that have already implemented the
>>>> > SourceTask::produceHeartbeatRecords
>>>> > >> > > method, etc. In general, it seems like we're trying to solve
>>>> two
>>>> > >> > completely
>>>> > >> > > different problems with this single KIP: adding framework-level
>>>> > >> support
>>>> > >> > for
>>>> > >> > > emitting heartbeat records for source connectors, and allowing
>>>> > source
>>>> > >> > > connectors to emit offsets without also emitting source
>>>> records. I
>>>> > >> don't
>>>> > >> > > mind addressing the two at the same time if the result is
>>>> elegant
>>>> > and
>>>> > >> > > doesn't compromise on the solution for either problem, but that
>>>> > >> doesn't
>>>> > >> > > seem to be the case here. Of the two problems, could we
>>>> describe one
>>>> > >> as
>>>> > >> > the
>>>> > >> > > primary and one as the secondary? If so, we might consider
>>>> dropping
>>>> > >> the
>>>> > >> > > secondary problm from this KIP and addressing it separately.
>>>> > >> > >
>>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821
>>>> > >> > > [2] -
>>>> > >> > >
>>>> > >> > >
>>>> > >> >
>>>> > >>
>>>> >
>>>> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html
>>>> > >> > >
>>>> > >> > > Cheers,
>>>> > >> > >
>>>> > >> > > Chris
>>>> > >> > >
>>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar <
>>>> sagarmeansoc...@gmail.com>
>>>> > >> > wrote:
>>>> > >> > >
>>>> > >> > > > Hi John,
>>>> > >> > > >
>>>> > >> > > > Thanks for taking. look at the KIP!
>>>> > >> > > >
>>>> > >> > > > The point about stream time not advancing in case of
>>>> infrequent
>>>> > >> updates
>>>> > >> > > is
>>>> > >> > > > an interesting one. I can imagine if the upstream producer
>>>> to a
>>>> > >> Kafka
>>>> > >> > > > Streams application is a Source Connector which isn't sending
>>>> > >> records
>>>> > >> > > > frequently(due to the nature of the data ingestion for
>>>> example),
>>>> > >> then
>>>> > >> > the
>>>> > >> > > > downstream stream processing can land into the issues you
>>>> > described
>>>> > >> > > above.
>>>> > >> > > >
>>>> > >> > > > Which also brings me to the second point you made about how
>>>> this
>>>> > >> would
>>>> > >> > be
>>>> > >> > > > used by downstream consumers. IIUC, you are referring to the
>>>> > >> consumers
>>>> > >> > of
>>>> > >> > > > the newly added topic i.e the heartbeat topic. In my mind,
>>>> the
>>>> > >> > heartbeat
>>>> > >> > > > topic is an internal topic (similar to offsets/config/status
>>>> topic
>>>> > >> in
>>>> > >> > > > connect), the main purpose of which is to trick the
>>>> framework to
>>>> > >> > produce
>>>> > >> > > > records to the offsets topic and advance the offsets. Since
>>>> every
>>>> > >> > > connector
>>>> > >> > > > could have a different definition of offsets(LSN, BinLogID
>>>> etc for
>>>> > >> > > > example), that logic to determine what the heartbeat records
>>>> > should
>>>> > >> be
>>>> > >> > > > would have to reside in the actual connector.
>>>> > >> > > >
>>>> > >> > > > Now that I think of it, it could very well be consumed by
>>>> > downstream
>>>> > >> > > > consumers/ Streams or Flink Applications and be further used
>>>> for
>>>> > >> some
>>>> > >> > > > decision making. A very crude example could be let's say if
>>>> the
>>>> > >> > heartbeat
>>>> > >> > > > records sent to the new heartbeat topic include timestamps,
>>>> then
>>>> > the
>>>> > >> > > > downstream streams application can use that timestamp to
>>>> close any
>>>> > >> time
>>>> > >> > > > windows. Having said that, it still appears to me that it's
>>>> > outside
>>>> > >> the
>>>> > >> > > > scope of the Connect framework and is something which is
>>>> difficult
>>>> > >> to
>>>> > >> > > > generalise because of the variety of Sources and the
>>>> definitions
>>>> > of
>>>> > >> > > > offsets.
>>>> > >> > > >
>>>> > >> > > > But, I would still be more than happy to add this example if
>>>> you
>>>> > >> think
>>>> > >> > it
>>>> > >> > > > can be useful in getting a better understanding of the idea
>>>> and
>>>> > also
>>>> > >> > its
>>>> > >> > > > utility beyond connect. Please let me know!
>>>> > >> > > >
>>>> > >> > > > Thanks!
>>>> > >> > > > Sagar.
>>>> > >> > > >
>>>> > >> > > >
>>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler <
>>>> vvcep...@apache.org
>>>> > >
>>>> > >> > > wrote:
>>>> > >> > > >
>>>> > >> > > > > Thanks for the KIP, Sagar!
>>>> > >> > > > >
>>>> > >> > > > > At first glance, this seems like a very useful feature.
>>>> > >> > > > >
>>>> > >> > > > > A common pain point in Streams is when upstream producers
>>>> don't
>>>> > >> send
>>>> > >> > > > > regular updates and stream time cannot advance. This causes
>>>> > >> > > > > stream-time-driven operations to appear to hang, like time
>>>> > windows
>>>> > >> > not
>>>> > >> > > > > closing, suppressions not firing, etc.
>>>> > >> > > > >
>>>> > >> > > > > From your KIP, I have a good idea of how the feature would
>>>> be
>>>> > >> > > integrated
>>>> > >> > > > > into connect, and it sounds good to me. I don't quite see
>>>> how
>>>> > >> > > downstream
>>>> > >> > > > > clients, such as a downstream Streams or Flink
>>>> application, or
>>>> > >> users
>>>> > >> > of
>>>> > >> > > > the
>>>> > >> > > > > Consumer would make use of this feature. Could you add some
>>>> > >> examples
>>>> > >> > of
>>>> > >> > > > > that nature?
>>>> > >> > > > >
>>>> > >> > > > > Thank you,
>>>> > >> > > > > -John
>>>> > >> > > > >
>>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote:
>>>> > >> > > > > > Hi All,
>>>> > >> > > > > >
>>>> > >> > > > > > Bumping the thread again.
>>>> > >> > > > > >
>>>> > >> > > > > > Sagar.
>>>> > >> > > > > >
>>>> > >> > > > > >
>>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar <
>>>> > >> sagarmeansoc...@gmail.com>
>>>> > >> > > > wrote:
>>>> > >> > > > > >
>>>> > >> > > > > >> Hi All,
>>>> > >> > > > > >>
>>>> > >> > > > > >> Bumping this discussion thread again.
>>>> > >> > > > > >>
>>>> > >> > > > > >> Thanks!
>>>> > >> > > > > >> Sagar.
>>>> > >> > > > > >>
>>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar <
>>>> > >> sagarmeansoc...@gmail.com>
>>>> > >> > > > wrote:
>>>> > >> > > > > >>
>>>> > >> > > > > >>> Hi All,
>>>> > >> > > > > >>>
>>>> > >> > > > > >>> I wanted to create a discussion thread for KIP-910:
>>>> > >> > > > > >>>
>>>> > >> > > > > >>>
>>>> > >> > > > > >>>
>>>> > >> > > > >
>>>> > >> > > >
>>>> > >> > >
>>>> > >> >
>>>> > >>
>>>> >
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records
>>>> > >> > > > > >>>
>>>> > >> > > > > >>> Thanks!
>>>> > >> > > > > >>> Sagar.
>>>> > >> > > > > >>>
>>>> > >> > > > > >>
>>>> > >> > > > >
>>>> > >> > > >
>>>> > >> > >
>>>> > >> >
>>>> > >>
>>>> > >
>>>> >
>>>>
>>>

Reply via email to