Hi All, I have created this PR: https://github.com/apache/kafka/pull/13899 which implements the approach outlined in the latest version of the KIP. I thought I could use this to validate the approach based on my understanding while the KIP itself gets reviewed. I can always change the implementation once we move to a final decision on the KIP.
Thanks! Sagar. On Wed, Jun 14, 2023 at 4:59 PM Sagar <sagarmeansoc...@gmail.com> wrote: > Hey All, > > Bumping this discussion thread again to see how the modified KIP looks > like. > > Thanks! > Sagar. > > On Mon, May 29, 2023 at 8:12 PM Sagar <sagarmeansoc...@gmail.com> wrote: > >> Hi, >> >> Bumping this thread again for further reviews. >> >> Thanks! >> Sagar. >> >> On Fri, May 12, 2023 at 3:38 PM Sagar <sagarmeansoc...@gmail.com> wrote: >> >>> Hi All, >>> >>> Thanks for the comments/reviews. I have updated the KIP >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >>> with a newer approach which shelves the need for an explicit topic. >>> >>> Please review again and let me know what you think. >>> >>> Thanks! >>> Sagar. >>> >>> >>> On Mon, Apr 24, 2023 at 3:35 PM Yash Mayya <yash.ma...@gmail.com> wrote: >>> >>>> Hi Sagar, >>>> >>>> Thanks for the KIP! I have a few questions and comments: >>>> >>>> 1) I agree with Chris' point about the separation of a connector >>>> heartbeat >>>> mechanism and allowing source connectors to generate offsets without >>>> producing data. What is the purpose of the heartbeat topic here and are >>>> there any concrete use cases for downstream consumers on this topic? Why >>>> can't we instead simply introduce a mechanism to retrieve a list of >>>> source >>>> partition / source offset pairs from the source tasks? >>>> >>>> 2) With the currently described mechanism, the new >>>> "SourceTask::produceHeartbeatRecords" method returns a >>>> "List<SourceRecord>" >>>> - what happens with the topic in each of these source records? Chris >>>> pointed this out above, but it doesn't seem to have been addressed? The >>>> "SourceRecord" class also has a bunch of other fields which will be >>>> irrelevant here (partition, key / value schema, key / value data, >>>> timestamp, headers). In fact, it seems like only the source partition >>>> and >>>> source offset are relevant here, so we should either introduce a new >>>> abstraction or simply use a data structure like a mapping from source >>>> partitions to source offsets (adds to the above point)? >>>> >>>> 3) I'm not sure I fully follow why the heartbeat timer / interval is >>>> needed? What are the downsides of >>>> calling "SourceTask::produceHeartbeatRecords" in every execution loop >>>> (similar to the existing "SourceTask::poll" method)? Is this only to >>>> prevent the generation of a lot of offset records? Since Connect's >>>> offsets >>>> topics are log compacted (and source partitions are used as keys for >>>> each >>>> source offset), I'm not sure if such concerns are valid and such a >>>> heartbeat timer / interval mechanism is required? >>>> >>>> 4) The first couple of rejected alternatives state that the use of a >>>> null >>>> topic / key / value are preferably avoided - but the current proposal >>>> would >>>> also likely require connectors to use such workarounds (null topic when >>>> the >>>> heartbeat topic is configured at a worker level and always for the key / >>>> value)? >>>> >>>> 5) The third rejected alternative talks about subclassing the >>>> "SourceRecord" class - this presumably means allowing connectors to pass >>>> special offset only records via the existing poll mechanism? Why was >>>> this >>>> considered a more invasive option? Was it because of the backward >>>> compatibility issues that would be introduced for plugins using the new >>>> public API class that still need to be deployed onto older Connect >>>> workers? >>>> >>>> Thanks, >>>> Yash >>>> >>>> On Fri, Apr 14, 2023 at 6:45 PM Sagar <sagarmeansoc...@gmail.com> >>>> wrote: >>>> >>>> > One thing I forgot to mention in my previous email was that the >>>> reason I >>>> > chose to include the opt-in behaviour via configs was that the users >>>> of the >>>> > connector know their workload patterns. If the workload is such that >>>> the >>>> > connector would receive regular valid updates then there’s ideally >>>> no need >>>> > for moving offsets since it would update automatically. >>>> > >>>> > This way they aren’t forced to use this feature and can use it only >>>> when >>>> > the workload is expected to be batchy or not frequent. >>>> > >>>> > Thanks! >>>> > Sagar. >>>> > >>>> > >>>> > On Fri, 14 Apr 2023 at 5:32 PM, Sagar <sagarmeansoc...@gmail.com> >>>> wrote: >>>> > >>>> > > Hi Chris, >>>> > > >>>> > > Thanks for following up on the response. Sharing my thoughts >>>> further: >>>> > > >>>> > > If we want to add support for connectors to emit offsets without >>>> > >> accompanying source records, we could (and IMO should) do that >>>> without >>>> > >> requiring users to manually enable that feature by adjusting >>>> worker or >>>> > >> connector configurations. >>>> > > >>>> > > >>>> > > With the current KIP design, I have tried to implement this in an >>>> opt-in >>>> > > manner via configs. I guess what you are trying to say is that this >>>> > doesn't >>>> > > need a config of it's own and instead could be part of the poll -> >>>> > > transform etc -> produce -> commit cycle. That way, the users don't >>>> need >>>> > to >>>> > > set any config and if the connector supports moving offsets w/o >>>> producing >>>> > > SourceRecords, it should happen automatically. Is that correct? If >>>> that >>>> > > is the concern, then I can think of not exposing a config and try >>>> to make >>>> > > this process automatically. That should ease the load on connector >>>> users, >>>> > > but your point about cognitive load on Connector developers, I am >>>> still >>>> > not >>>> > > sure how to address that. The offsets are privy to a connector and >>>> the >>>> > > framework at best can provide hooks to the tasks to update their >>>> offsets. >>>> > > Connector developers would still have to consider all cases before >>>> > updating >>>> > > offsets. And if I ignore the heartbeat topic and heartbeat >>>> interval ms >>>> > > configs, then what the KIP proposes currently isn't much different >>>> in >>>> > that >>>> > > regard. Just that it produces a List of SourceRecord which can be >>>> changed >>>> > > to a Map of SourcePartition and their offsets if you think that >>>> would >>>> > > simplify things. Are there other cases in your mind which need >>>> > addressing? >>>> > > >>>> > > Here's my take on the usecases: >>>> > > >>>> > > 1. Regarding the example about SMTs with Object Storage based >>>> > > connectors, it was one of the scenarios identified. We have some >>>> > connectors >>>> > > that rely on the offsets topic to check if the next batch of >>>> files >>>> > should >>>> > > be processed and because of filtering of the last record from the >>>> > files, >>>> > > the eof supposedly is never reached and the connector can't >>>> commit >>>> > offsets >>>> > > for that source partition(file). If there was a mechanism to >>>> update >>>> > offsets >>>> > > for such a source file, then with some moderately complex state >>>> > tracking, >>>> > > the connector can mark that file as processed and proceed. >>>> > > 2. There's another use case with the same class of connectors >>>> where if >>>> > > a file is malformed, then the connector couldn't produce any >>>> offsets >>>> > > because the file couldn't get processed completely. To handle >>>> such >>>> > cases, >>>> > > the connector developers have introduced a dev/null sort of topic >>>> > where >>>> > > they produce a record to this corrupted file topic and move the >>>> offset >>>> > > somehow. This topic ideally isn't needed and with a mechanism to >>>> > update >>>> > > offsets would have helped in this case as well. >>>> > > 3. Coming to CDC based connectors, >>>> > > 1. We had a similar issue with Oracle CDC source connector and >>>> > > needed to employ the same heartbeat mechanism to get around >>>> it. >>>> > > 2. MongoDB CDC source Connector has employed the same >>>> heartbeat >>>> > > mechanism Check `heartbeat.interval.ms` here ( >>>> > > >>>> > >>>> https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/error-handling/ >>>> > > ). >>>> > > 3. Another CDC connector for ScyllaDB employs a similar >>>> mechanism. >>>> > > >>>> > >>>> https://github.com/scylladb/scylla-cdc-source-connector/search?q=heartbeat >>>> > > 4. For CDC based connectors, you could argue that these >>>> connectors >>>> > > have been able to solve this error then why do we need >>>> framework >>>> > level >>>> > > support. But the point I am trying to make is that this >>>> limitation >>>> > from the >>>> > > framework is forcing CDC connector developers to implement >>>> > per-connector >>>> > > solutions/hacks(at times). And there could always be more CDC >>>> > connectors in >>>> > > the pipeline forcing them to take a similar route as well. >>>> > > 4. There's also a case at times with CDC source connectors which >>>> are >>>> > > REST Api / Web Service based(Zendesk Source Connector for >>>> example) . >>>> > These >>>> > > connectors typically use timestamps from the responses as >>>> offsets. If >>>> > > there's a long period of inactivity wherein the API invocations >>>> don't >>>> > > return any data, then the offsets won't move and the connector >>>> would >>>> > keep >>>> > > using the same timestamp that it received from the last non-empty >>>> > response. >>>> > > If this period of inactivity keeps growing, and the API imposes >>>> any >>>> > limits >>>> > > on how far back we can go in terms of window start, then this >>>> could >>>> > > potentially be a problem. In this case even though the connector >>>> was >>>> > caught >>>> > > up with all the responses, it may need to snapshot again. In >>>> this case >>>> > > updating offsets can easily help since all the connector needs >>>> to do >>>> > is to >>>> > > move the timestamp which would move the offset inherently. >>>> > > >>>> > > I still believe that this is something the framework should support >>>> OOB >>>> > > irrespective of whether the connectors have been able to get around >>>> this >>>> > > restriction or not. >>>> > > >>>> > > Lastly, about your comments here: >>>> > > >>>> > > I'm also not sure that it's worth preserving the current behavior >>>> that >>>> > >> offsets for records that have been filtered out via SMT are not >>>> > committed. >>>> > > >>>> > > >>>> > > Let me know if we need a separate JIRA to track this? This somehow >>>> didn't >>>> > > look related to this discussion. >>>> > > >>>> > > Thanks! >>>> > > Sagar. >>>> > > >>>> > > >>>> > > On Wed, Apr 12, 2023 at 9:34 PM Chris Egerton >>>> <chr...@aiven.io.invalid> >>>> > > wrote: >>>> > > >>>> > >> Hi Sagar, >>>> > >> >>>> > >> I'm sorry, I'm still not convinced that this design solves the >>>> > problem(s) >>>> > >> it sets out to solve in the best way possible. I tried to >>>> highlight this >>>> > >> in >>>> > >> my last email: >>>> > >> >>>> > >> > In general, it seems like we're trying to solve two completely >>>> > different >>>> > >> problems with this single KIP: adding framework-level support for >>>> > emitting >>>> > >> heartbeat records for source connectors, and allowing source >>>> connectors >>>> > to >>>> > >> emit offsets without also emitting source records. I don't mind >>>> > addressing >>>> > >> the two at the same time if the result is elegant and doesn't >>>> compromise >>>> > >> on >>>> > >> the solution for either problem, but that doesn't seem to be the >>>> case >>>> > >> here. >>>> > >> Of the two problems, could we describe one as the primary and one >>>> as the >>>> > >> secondary? If so, we might consider dropping the secondary problem >>>> from >>>> > >> this KIP and addressing it separately. >>>> > >> >>>> > >> If we wanted to add support for heartbeat records, we could (and >>>> IMO >>>> > >> should) do that without requiring connectors to implement any new >>>> > methods >>>> > >> and only require adjustments to worker or connector configurations >>>> by >>>> > >> users >>>> > >> in order to enable that feature. >>>> > >> >>>> > >> If we want to add support for connectors to emit offsets without >>>> > >> accompanying source records, we could (and IMO should) do that >>>> without >>>> > >> requiring users to manually enable that feature by adjusting >>>> worker or >>>> > >> connector configurations. >>>> > >> >>>> > >> >>>> > >> I'm also not sure that it's worth preserving the current behavior >>>> that >>>> > >> offsets for records that have been filtered out via SMT are not >>>> > committed. >>>> > >> I can't think of a case where this would be useful and there are >>>> > obviously >>>> > >> plenty where it isn't. There's also a slight discrepancy in how >>>> these >>>> > >> kinds >>>> > >> of records are treated by the Connect runtime now; if a record is >>>> > dropped >>>> > >> because of an SMT, then its offset isn't committed, but if it's >>>> dropped >>>> > >> because exactly-once support is enabled and the connector chose to >>>> abort >>>> > >> the batch containing the record, then its offset is still >>>> committed. >>>> > After >>>> > >> thinking carefully about the aborted transaction behavior, we >>>> realized >>>> > >> that >>>> > >> it was fine to commit the offsets for those records, and I believe >>>> that >>>> > >> the >>>> > >> same logic can be applied to any record that we're done trying to >>>> send >>>> > to >>>> > >> Kafka (regardless of whether it was sent correctly, dropped due to >>>> > >> producer >>>> > >> error, filtered via SMT, etc.). >>>> > >> >>>> > >> I also find the file-based source connector example a little >>>> confusing. >>>> > >> What about that kind of connector causes the offset for the last >>>> record >>>> > of >>>> > >> a file to be treated differently? Is there anything different about >>>> > >> filtering that record via SMT vs. dropping it altogether because >>>> of an >>>> > >> asynchronous producer error with "errors.tolerance" set to "all"? >>>> And >>>> > >> finally, how would such a connector use the design proposed here? >>>> > >> >>>> > >> Finally, I don't disagree that if there are other legitimate use >>>> cases >>>> > >> that >>>> > >> would be helped by addressing KAFKA-3821, we should try to solve >>>> that >>>> > >> issue >>>> > >> in the Kafka Connect framework instead of requiring individual >>>> > connectors >>>> > >> to implement their own solutions. But the cognitive load added by >>>> the >>>> > >> design proposed here, for connector developers and Connect cluster >>>> > >> administrators alike, costs too much to justify by pointing to an >>>> > >> already-solved problem encountered by a single group of connectors >>>> > (i.e., >>>> > >> Debezium). This is why I think it's crucial that we identify >>>> realistic >>>> > >> cases where this feature would actually be useful, and right now, I >>>> > don't >>>> > >> think any have been provided (at least, not ones that have already >>>> been >>>> > >> addressed or could be addressed with much simpler changes). >>>> > >> >>>> > >> Cheers, >>>> > >> >>>> > >> Chris >>>> > >> >>>> > >> On Tue, Apr 11, 2023 at 7:30 AM Sagar <sagarmeansoc...@gmail.com> >>>> > wrote: >>>> > >> >>>> > >> > Hi Chris, >>>> > >> > >>>> > >> > Thanks for your detailed feedback! >>>> > >> > >>>> > >> > nits: I have taken care of them now. Thanks for pointing those >>>> out. >>>> > >> > >>>> > >> > non-nits: >>>> > >> > >>>> > >> > 6) It seems (based on both the KIP and discussion on KAFKA-3821) >>>> that >>>> > >> the >>>> > >> > > only use case for being able to emit offsets without also >>>> emitting >>>> > >> source >>>> > >> > > records that's been identified so far is for CDC source >>>> connectors >>>> > >> like >>>> > >> > > Debezium. >>>> > >> > >>>> > >> > >>>> > >> > I am aware of atleast one more case where the non production of >>>> > offsets >>>> > >> > (due to non production of records ) leads to the failure of >>>> connectors >>>> > >> when >>>> > >> > the source purges the records of interest. This happens in File >>>> based >>>> > >> > source connectors (like s3/blob storage ) in which if the last >>>> record >>>> > >> from >>>> > >> > a file is fiterterd due to an SMT, then that particular file is >>>> never >>>> > >> > committed to the source partition and eventually when the file is >>>> > >> deleted >>>> > >> > from the source and the connector is restarted due to some >>>> reason, it >>>> > >> > fails. >>>> > >> > Moreover, I feel the reason this support should be there in the >>>> Kafka >>>> > >> > Connect framework is because this is a restriction of the >>>> framework >>>> > and >>>> > >> > today the framework provides no support for getting around this >>>> > >> limitation. >>>> > >> > Every connector has it's own way of handling offsets and having >>>> each >>>> > >> > connector handle this restriction in its own way can make it >>>> complex. >>>> > >> > Whether we choose to do it the way this KIP prescribes or any >>>> other >>>> > way >>>> > >> is >>>> > >> > up for debate but IMHO, the framework should provide a way of >>>> > >> > getting around this limitation. >>>> > >> > >>>> > >> > 7. If a task produces heartbeat records and source records that >>>> use >>>> > the >>>> > >> > > same source partition, which offset will ultimately be >>>> committed? >>>> > >> > >>>> > >> > >>>> > >> > The idea is to add the records returned by the >>>> > `produceHeartbeatRecords` >>>> > >> > to the same `toSend` list within >>>> `AbstractWorkerSourceTask#execute`. >>>> > >> The >>>> > >> > `produceHeartbeatRecords` would be invoked before we make the >>>> `poll` >>>> > >> call. >>>> > >> > Hence, the offsets committed would be in the same order in which >>>> they >>>> > >> would >>>> > >> > be written. Note that, the onus is on the Connector >>>> implementation to >>>> > >> not >>>> > >> > return records which can lead to data loss or data going out of >>>> order. >>>> > >> The >>>> > >> > framework would just commit based on whatever is supplied. Also, >>>> > AFAIK, >>>> > >> 2 >>>> > >> > `normal` source records can also produce the same source >>>> partitions >>>> > and >>>> > >> > they are committed in the order in which they are written. >>>> > >> > >>>> > >> > 8. The SourceTask::produceHeartbeatRecords method returns a >>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic >>>> for a >>>> > >> > > connector via the (connector- or worker-level) >>>> > >> "heartbeat.records.topic" >>>> > >> > > property. Since every constructor for the SourceRecord class >>>> [2] >>>> > >> > requires a >>>> > >> > > topic to be supplied, what will happen to that topic? Will it >>>> be >>>> > >> ignored? >>>> > >> > > If so, I think we should look for a cleaner solution. >>>> > >> > >>>> > >> > >>>> > >> > Sorry, I couldn't quite follow which topic will be ignored in >>>> this >>>> > case. >>>> > >> > >>>> > >> > 9. A large concern raised in the discussion for KAFKA-3821 was >>>> the >>>> > >> allowing >>>> > >> > > connectors to control the ordering of these special >>>> "offsets-only" >>>> > >> > > emissions and the regular source records returned from >>>> > >> SourceTask::poll. >>>> > >> > > Are we choosing to ignore that concern? If so, can you add >>>> this to >>>> > the >>>> > >> > > rejected alternatives section along with a rationale? >>>> > >> > >>>> > >> > >>>> > >> > One thing to note is that the for every connector, the condition >>>> to >>>> > emit >>>> > >> > the heartbeat record is totally up to the connector, For >>>> example, for >>>> > a >>>> > >> > connector which is tracking transactions for an ordered log, if >>>> there >>>> > >> are >>>> > >> > open transactions, it might not need to emit heartbeat records >>>> when >>>> > the >>>> > >> > timer expires while for file based connectors, if the same file >>>> is >>>> > being >>>> > >> > processed again and again due to an SMT or some other reasons, >>>> then it >>>> > >> can >>>> > >> > choose to emit that partition. The uber point here is that every >>>> > >> connector >>>> > >> > has it's own requirements and the framework can't really make an >>>> > >> assumption >>>> > >> > about it. What the KIP is trying to do is to provide a mechanism >>>> to >>>> > the >>>> > >> > connector to commit new offsets. With this approach, as far as I >>>> can >>>> > >> think >>>> > >> > so far, there doesn't seem to be a case of out of order >>>> processing. If >>>> > >> you >>>> > >> > have other concerns/thoughts I would be happy to know them. >>>> > >> > >>>> > >> > 10. If, sometime in the future, we wanted to add framework-level >>>> > support >>>> > >> > > for sending heartbeat records that doesn't require connectors >>>> to >>>> > >> > implement >>>> > >> > > any new APIs... >>>> > >> > >>>> > >> > >>>> > >> > The main purpose of producing heartbeat records is to be able to >>>> emit >>>> > >> > offsets w/o any new records. We are using heartbeat records to >>>> solve >>>> > the >>>> > >> > primary concern of offsets getting stalled. The reason to do >>>> that was >>>> > >> once >>>> > >> > we get SourceRecords, then the rest of the code is already in >>>> place to >>>> > >> > write it to a topic of interest and commit offsets and that >>>> seemed the >>>> > >> most >>>> > >> > non invasive in terms of framework level changes. If in the >>>> future we >>>> > >> want >>>> > >> > to do a framework-only heartbeat record support, then this would >>>> > create >>>> > >> > confusion as you pointed out. Do you think the choice of the name >>>> > >> heartbeat >>>> > >> > records is creating confusion in this case? Maybe we can call >>>> these >>>> > >> special >>>> > >> > records something else (not sure what at this point) which would >>>> then >>>> > >> > decouple the 2 logically and implementation wise as well? >>>> > >> > >>>> > >> > Thanks! >>>> > >> > Sagar. >>>> > >> > >>>> > >> > On Tue, Mar 28, 2023 at 8:28 PM Chris Egerton >>>> <chr...@aiven.io.invalid >>>> > > >>>> > >> > wrote: >>>> > >> > >>>> > >> > > Hi Sagar, >>>> > >> > > >>>> > >> > > Thanks for the KIP! I have some thoughts. >>>> > >> > > >>>> > >> > > Nits: >>>> > >> > > >>>> > >> > > 1. Shouldn't KAFKA-3821 [1] be linked as the Jira ticket on >>>> the KIP? >>>> > >> Or >>>> > >> > is >>>> > >> > > there a different ticket that should be associated with it? >>>> > >> > > 2. The current state is listed as "Draft". Considering it's >>>> been >>>> > >> brought >>>> > >> > up >>>> > >> > > for discussion, maybe the KIP should be updated to >>>> "Discussion"? >>>> > >> > > 3. Can you add a link for the discussion thread to the KIP? >>>> > >> > > 4. The KIP states that "In this process, offsets are written at >>>> > >> regular >>>> > >> > > intervals(driven by `offset.flush.interval.ms`)". This isn't >>>> > strictly >>>> > >> > > accurate since, when exactly-once support is enabled, offset >>>> commits >>>> > >> can >>>> > >> > > also be performed for each record batch (which is the default) >>>> or >>>> > when >>>> > >> > > explicitly requested by the task instance (if the connector >>>> > implements >>>> > >> > the >>>> > >> > > API to define its own transactions and the user has configured >>>> it to >>>> > >> do >>>> > >> > > so). Maybe better to just say "Offsets are written >>>> periodically"? >>>> > >> > > 5. The description for the (per-connector) >>>> "heartbeat.records.topic >>>> > " >>>> > >> > > property states that it is "Only applicable in distributed >>>> mode; in >>>> > >> > > standalone mode, setting this property will have no effect". >>>> Is this >>>> > >> > > correct? >>>> > >> > > >>>> > >> > > Non-nits: >>>> > >> > > >>>> > >> > > 6. It seems (based on both the KIP and discussion on >>>> KAFKA-3821) >>>> > that >>>> > >> the >>>> > >> > > only use case for being able to emit offsets without also >>>> emitting >>>> > >> source >>>> > >> > > records that's been identified so far is for CDC source >>>> connectors >>>> > >> like >>>> > >> > > Debezium. But Debezium already has support for this exact >>>> feature >>>> > >> > (emitting >>>> > >> > > heartbeat records that include offsets that cannot be >>>> associated >>>> > with >>>> > >> > > other, "regular" source records). Why should we add this >>>> feature to >>>> > >> Kafka >>>> > >> > > Connect when the problem it addresses is already solved in the >>>> set >>>> > >> > > connectors that (it seems) would have any need for it, and the >>>> size >>>> > of >>>> > >> > that >>>> > >> > > set is extremely small? If there are other practical use cases >>>> for >>>> > >> > > connectors that would benefit from this feature, please let me >>>> know. >>>> > >> > > >>>> > >> > > 7. If a task produces heartbeat records and source records >>>> that use >>>> > >> the >>>> > >> > > same source partition, which offset will ultimately be >>>> committed? >>>> > >> > > >>>> > >> > > 8. The SourceTask::produceHeartbeatRecords method returns a >>>> > >> > > List<SourceRecord>, and users can control the heartbeat topic >>>> for a >>>> > >> > > connector via the (connector- or worker-level) >>>> > >> "heartbeat.records.topic" >>>> > >> > > property. Since every constructor for the SourceRecord class >>>> [2] >>>> > >> > requires a >>>> > >> > > topic to be supplied, what will happen to that topic? Will it >>>> be >>>> > >> ignored? >>>> > >> > > If so, I think we should look for a cleaner solution. >>>> > >> > > >>>> > >> > > 9. A large concern raised in the discussion for KAFKA-3821 was >>>> the >>>> > >> > allowing >>>> > >> > > connectors to control the ordering of these special >>>> "offsets-only" >>>> > >> > > emissions and the regular source records returned from >>>> > >> SourceTask::poll. >>>> > >> > > Are we choosing to ignore that concern? If so, can you add >>>> this to >>>> > the >>>> > >> > > rejected alternatives section along with a rationale? >>>> > >> > > >>>> > >> > > 10. If, sometime in the future, we wanted to add >>>> framework-level >>>> > >> support >>>> > >> > > for sending heartbeat records that doesn't require connectors >>>> to >>>> > >> > implement >>>> > >> > > any new APIs (e.g., SourceTask::produceHeartbeatRecords), a >>>> lot of >>>> > >> this >>>> > >> > > would paint us into a corner design-wise. We'd have to think >>>> > carefully >>>> > >> > > about which property names would be used, how to account for >>>> > >> connectors >>>> > >> > > that have already implemented the >>>> > SourceTask::produceHeartbeatRecords >>>> > >> > > method, etc. In general, it seems like we're trying to solve >>>> two >>>> > >> > completely >>>> > >> > > different problems with this single KIP: adding framework-level >>>> > >> support >>>> > >> > for >>>> > >> > > emitting heartbeat records for source connectors, and allowing >>>> > source >>>> > >> > > connectors to emit offsets without also emitting source >>>> records. I >>>> > >> don't >>>> > >> > > mind addressing the two at the same time if the result is >>>> elegant >>>> > and >>>> > >> > > doesn't compromise on the solution for either problem, but that >>>> > >> doesn't >>>> > >> > > seem to be the case here. Of the two problems, could we >>>> describe one >>>> > >> as >>>> > >> > the >>>> > >> > > primary and one as the secondary? If so, we might consider >>>> dropping >>>> > >> the >>>> > >> > > secondary problm from this KIP and addressing it separately. >>>> > >> > > >>>> > >> > > [1] - https://issues.apache.org/jira/browse/KAFKA-3821 >>>> > >> > > [2] - >>>> > >> > > >>>> > >> > > >>>> > >> > >>>> > >> >>>> > >>>> https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/source/SourceRecord.html >>>> > >> > > >>>> > >> > > Cheers, >>>> > >> > > >>>> > >> > > Chris >>>> > >> > > >>>> > >> > > On Sat, Mar 25, 2023 at 11:18 PM Sagar < >>>> sagarmeansoc...@gmail.com> >>>> > >> > wrote: >>>> > >> > > >>>> > >> > > > Hi John, >>>> > >> > > > >>>> > >> > > > Thanks for taking. look at the KIP! >>>> > >> > > > >>>> > >> > > > The point about stream time not advancing in case of >>>> infrequent >>>> > >> updates >>>> > >> > > is >>>> > >> > > > an interesting one. I can imagine if the upstream producer >>>> to a >>>> > >> Kafka >>>> > >> > > > Streams application is a Source Connector which isn't sending >>>> > >> records >>>> > >> > > > frequently(due to the nature of the data ingestion for >>>> example), >>>> > >> then >>>> > >> > the >>>> > >> > > > downstream stream processing can land into the issues you >>>> > described >>>> > >> > > above. >>>> > >> > > > >>>> > >> > > > Which also brings me to the second point you made about how >>>> this >>>> > >> would >>>> > >> > be >>>> > >> > > > used by downstream consumers. IIUC, you are referring to the >>>> > >> consumers >>>> > >> > of >>>> > >> > > > the newly added topic i.e the heartbeat topic. In my mind, >>>> the >>>> > >> > heartbeat >>>> > >> > > > topic is an internal topic (similar to offsets/config/status >>>> topic >>>> > >> in >>>> > >> > > > connect), the main purpose of which is to trick the >>>> framework to >>>> > >> > produce >>>> > >> > > > records to the offsets topic and advance the offsets. Since >>>> every >>>> > >> > > connector >>>> > >> > > > could have a different definition of offsets(LSN, BinLogID >>>> etc for >>>> > >> > > > example), that logic to determine what the heartbeat records >>>> > should >>>> > >> be >>>> > >> > > > would have to reside in the actual connector. >>>> > >> > > > >>>> > >> > > > Now that I think of it, it could very well be consumed by >>>> > downstream >>>> > >> > > > consumers/ Streams or Flink Applications and be further used >>>> for >>>> > >> some >>>> > >> > > > decision making. A very crude example could be let's say if >>>> the >>>> > >> > heartbeat >>>> > >> > > > records sent to the new heartbeat topic include timestamps, >>>> then >>>> > the >>>> > >> > > > downstream streams application can use that timestamp to >>>> close any >>>> > >> time >>>> > >> > > > windows. Having said that, it still appears to me that it's >>>> > outside >>>> > >> the >>>> > >> > > > scope of the Connect framework and is something which is >>>> difficult >>>> > >> to >>>> > >> > > > generalise because of the variety of Sources and the >>>> definitions >>>> > of >>>> > >> > > > offsets. >>>> > >> > > > >>>> > >> > > > But, I would still be more than happy to add this example if >>>> you >>>> > >> think >>>> > >> > it >>>> > >> > > > can be useful in getting a better understanding of the idea >>>> and >>>> > also >>>> > >> > its >>>> > >> > > > utility beyond connect. Please let me know! >>>> > >> > > > >>>> > >> > > > Thanks! >>>> > >> > > > Sagar. >>>> > >> > > > >>>> > >> > > > >>>> > >> > > > On Fri, Mar 24, 2023 at 7:22 PM John Roesler < >>>> vvcep...@apache.org >>>> > > >>>> > >> > > wrote: >>>> > >> > > > >>>> > >> > > > > Thanks for the KIP, Sagar! >>>> > >> > > > > >>>> > >> > > > > At first glance, this seems like a very useful feature. >>>> > >> > > > > >>>> > >> > > > > A common pain point in Streams is when upstream producers >>>> don't >>>> > >> send >>>> > >> > > > > regular updates and stream time cannot advance. This causes >>>> > >> > > > > stream-time-driven operations to appear to hang, like time >>>> > windows >>>> > >> > not >>>> > >> > > > > closing, suppressions not firing, etc. >>>> > >> > > > > >>>> > >> > > > > From your KIP, I have a good idea of how the feature would >>>> be >>>> > >> > > integrated >>>> > >> > > > > into connect, and it sounds good to me. I don't quite see >>>> how >>>> > >> > > downstream >>>> > >> > > > > clients, such as a downstream Streams or Flink >>>> application, or >>>> > >> users >>>> > >> > of >>>> > >> > > > the >>>> > >> > > > > Consumer would make use of this feature. Could you add some >>>> > >> examples >>>> > >> > of >>>> > >> > > > > that nature? >>>> > >> > > > > >>>> > >> > > > > Thank you, >>>> > >> > > > > -John >>>> > >> > > > > >>>> > >> > > > > On Fri, Mar 24, 2023, at 05:23, Sagar wrote: >>>> > >> > > > > > Hi All, >>>> > >> > > > > > >>>> > >> > > > > > Bumping the thread again. >>>> > >> > > > > > >>>> > >> > > > > > Sagar. >>>> > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > On Fri, Mar 10, 2023 at 4:42 PM Sagar < >>>> > >> sagarmeansoc...@gmail.com> >>>> > >> > > > wrote: >>>> > >> > > > > > >>>> > >> > > > > >> Hi All, >>>> > >> > > > > >> >>>> > >> > > > > >> Bumping this discussion thread again. >>>> > >> > > > > >> >>>> > >> > > > > >> Thanks! >>>> > >> > > > > >> Sagar. >>>> > >> > > > > >> >>>> > >> > > > > >> On Thu, Mar 2, 2023 at 3:44 PM Sagar < >>>> > >> sagarmeansoc...@gmail.com> >>>> > >> > > > wrote: >>>> > >> > > > > >> >>>> > >> > > > > >>> Hi All, >>>> > >> > > > > >>> >>>> > >> > > > > >>> I wanted to create a discussion thread for KIP-910: >>>> > >> > > > > >>> >>>> > >> > > > > >>> >>>> > >> > > > > >>> >>>> > >> > > > > >>>> > >> > > > >>>> > >> > > >>>> > >> > >>>> > >> >>>> > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-910%3A+Update+Source+offsets+for+Source+Connectors+without+producing+records >>>> > >> > > > > >>> >>>> > >> > > > > >>> Thanks! >>>> > >> > > > > >>> Sagar. >>>> > >> > > > > >>> >>>> > >> > > > > >> >>>> > >> > > > > >>>> > >> > > > >>>> > >> > > >>>> > >> > >>>> > >> >>>> > > >>>> > >>>> >>>