Re: [VOTE] KIP-349 Priorities for Source Topics
> On Oct 12, 2018, at 5:06 PM, Colin McCabe wrote: > > Maybe there's some really cool use-case that I haven't thought of. But so > far I can't really think of any time I would need topic priorities if I was > muting topics and offloading blocking operations in a reasonable way. It > would be good to identify use-cases Hi Colin, How about the use-case where there are multiple streams/topics, and the intent is to have a single consumer interleave the messages so that higher priority messages are processed first ? That seems to be what the reporter of the associated Jira ticket https://issues.apache.org/jira/browse/KAFKA-6690 <https://issues.apache.org/jira/browse/KAFKA-6690> has identified as a use-case he frequently encounters. I’ve asked him to elaborate on the dev list though he has not responded yet. Best, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
The reporter of KAFKA-6690 (Bala) replied in the JIra ticket to my question to elaborate about his use-case. I don’t think he’s on the dev list. Here’s his response: Bala: Sorry about the delay in reply. We use Kafka to process the asynchronous events of our Document Management System such as preview generation, indexing for search etc. The traffic gets generated via Web and Desktop Sync application. In such cases, we had to prioritize the traffic from web and consume them first. But this might lead to the starvation of events from sync if the consumer speed is slow and the event rate is high from web. A solution to handle the starvation with a timeout after which the events are consumed normally for a specified period of time would be great and help us use our resources effectively. -- Nick > On Oct 18, 2018, at 12:23 PM, n...@afshartous.com wrote: > >> On Oct 12, 2018, at 5:06 PM, Colin McCabe wrote: >> >> Maybe there's some really cool use-case that I haven't thought of. But so >> far I can't really think of any time I would need topic priorities if I was >> muting topics and offloading blocking operations in a reasonable way. It >> would be good to identify use-cases > > > Hi Colin, > > How about the use-case where there are multiple streams/topics, and the > intent is to have a single consumer interleave the messages so that higher > priority messages are processed first ? > That seems to be what the reporter of the associated Jira ticket > > https://issues.apache.org/jira/browse/KAFKA-6690 > <https://issues.apache.org/jira/browse/KAFKA-6690> > > has identified as a use-case he frequently encounters. I’ve asked him to > elaborate on the dev list though he has not responded yet. > > Best, > -- > Nick > > >
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Oct 26, 2018, at 2:00 PM, Colin McCabe wrote: >> > Priorities won't help for this use-case, right? If the "web" partition has a > higher priority, and data is always available, there will *never* be any > events reported for "sync". Priorities don't prevent starvation-- they cause > starvation by design, because the high priority partition always takes > priority. Starvation is certainly an issue, though we could include a timeout as Bala suggested to address this. > In general the best solution would probably be to have a work queue between > the consumer and the event handler, and manage the backpressure as > appropriate. This could be done with pause and resume, as Streams does. I agree that similar semantics could be achieved with a work queue. What we’re voting on is the merits topic prioritization to make the API more expressive and to make it easier for developers to do this. Thanks Colin for your vote on the KIP and for all you input. I look forward to hearing from others. Cheers, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
Hi All, Bumping this thread for more votes https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics> Cheers, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
Bumping again for more votes. -- Nick > On Dec 26, 2018, at 12:36 PM, n...@afshartous.com wrote: > > Bumping this thread for more votes > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics><https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics>>
Re: [VOTE] KIP-349 Priorities for Source Topics
Thanks Colin and Mathias. > On Jan 12, 2019, at 8:27 PM, Matthias J. Sax wrote: > > Thus, I would suggest to limit this KIP to the consumer only, otherwise, > the scope will be too large and this KIP will drag on even longer. If we > really want to add this to Kafka Streams, I expect a long and difficult > discussion about this by itself, and thus, doing this in a follow up KIP > (if there is any demand) seems to be the better approach. > Agreed, and my intent is to limit the scope to the consumer. > About the starvation issue: maybe it's a bold claim, but is a potential > starvation of a low-priority topic not intended by design if topics have On reflection, it would be hard to describe the semantics of an API that tried to address starvation by temporarily disabling prioritization, and then oscillating back and forth. Thus I agree that it makes sense not to try and address starvation to Mathias’ point that this is intended by design. The KIP has been updated to reflect this by removing the second method. Regarding incremental fetch, Colin do you have any suggestion on which option to adopt or how to proceed ? -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
Hi Jan, As discussed, I’ve adopted the position that MessageChooser is orthogonal to topic prioritization and hence outside the scope of KIP-349. -- Nick > On Jan 14, 2019, at 12:47 AM, Jan Filipiak wrote: > > On 14.01.2019 02:48, n...@afshartous.com <mailto:n...@afshartous.com> wrote: > >> >> On reflection, it would be hard to describe the semantics of an API that >> tried to address starvation by temporarily disabling prioritization, and >> then oscillating back and forth. >> Thus I agree that it makes sense not to try and address starvation to >> Mathias’ point that this is intended by design. The KIP has been updated to >> reflect this by removing the second method. >> > > The semantics of almost everything are hard to describe with only those > two tools at hand. Just here to remember yall that Samza already shows > us the interface of a powerful enough abstraction to get stuff done :) > > https://samza.apache.org/learn/documentation/0.12/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html > > <https://samza.apache.org/learn/documentation/0.12/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html> > > welcome :)
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Jan 15, 2019, at 2:26 PM, Colin McCabe wrote: > > I think it makes sense to go back to use-cases again. So far, all of the > use-cases we discussed could be handled by pause and resume. So it makes > sense to try to figure out what the issue with those APIs is. Are they not > well-documented enough? Is there something higher-level we could build on > top to make them easier to use? > > It would be better to wait until a user comes forward and with a case where > priorities are needed, to implement them. Since then we would know more > about what the API should be, etc. Hi Colin, I agree that the use-cases are important. Rather than wait though I took some initiative and posted the message below to the Kafka user list (Subject: Prioritized Topics for Kafka). Since yesterday there have been 6 replies containing 7 different use-cases and very positive feedback. Please review. https://lists.apache.org/list.html?us...@kafka.apache.org <https://lists.apache.org/list.html?us...@kafka.apache.org> At this point I feel like we have enough info and would like to try and work towards a vote or set the status of the KIP to dormant. Cheers, -- Nick > On Jan 16, 2019, at 9:51 PM, n...@afshartous.com wrote: > > Hi all, > > On the dev list we’ve been discussing a proposed new feature (prioritized > topics). In a nutshell, when consuming from a set of topics with assigned > priorities, consumption from lower-priority topics only occurs if there’s no > data flowing in from a higher-priority topic. > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics><https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics>> > > One question is are there use-cases for the proposed API. If you think this > would be useful and have use-cases in mind please reply with the use-cases. > > Its also possible to implement prioritization with the existing API by using > a combination of pausing, resuming, and local buffering. The question is > then does it make sense to introduce the proposed higher-level API to make > this easier ? > > The responses will be used as input to determine if we move ahead with the > proposal. Thanks in advance for input. > > Cheers, > -- > Nick
KAFKA-6690 Priorities for Source Topics
Hi all, I picked up KAFKA-6690 Priorities for Source Topics. https://issues.apache.org/jira/browse/KAFKA-6690 In the ticket I made some notes about extending the API to incorporate topic priorities. For implementation, I was thinking of filtering out lower priority topics in method Fetcher.fetchablePartitions. Thanks for any input. Regards, -- Nick
Re: [DISCUSS] KIP-349 Priorities for Source Topics
> From: "Matthias J. Sax" One general question: The Jira is marked as "stream", so I am wondering what > the intended scope the KIP is because, it suggests a new consumer > API only. Can you clarify? Based on the thread in KAFKA-6690, I’ve changed the component from streams to consumer. -- Nick
[VOTE] KIP-349 Priorities for Source Topics
Hi All, Calling for a vote on KIP-349 https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics> Cheers, -- Nick
[DISCUSS] KIP-349 Priorities for Source Topics
Since there are questions I changed the heading from VOTE to DISCUSS > On Aug 8, 2018, at 9:09 PM, Matt Farmer wrote: > > s it worth spelling out explicitly what the behavior is when two topics > have the same priority? I'm a bit fuzzy on how we choose what topics to > consume from right now, if I'm being honest, so it could be useful to > outline the current behavior in the background and to spell out how that > would change (or if it would change) when two topics are given the same > priority. I added an additional note in the KIP’s Compatibility section to clarify that current behavior would not change in order to preserve backwards compatibility. > Also, how does this play with max.poll.records? Does the consumer read from > all the topics in priority order until we've hit the number of records or > the poll timeout? Or does it immediately return the high priority records > without pulling low priority records? My own interpretation would be to read from all the topics in priority order as the consumer is subscribed to multiple topics. -- Nick > > > On Wed, Aug 8, 2018 at 8:39 PM <mailto:n...@afshartous.com>> wrote: > >> >> Hi All, >> >> Calling for a vote on KIP-349 >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics >> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics> >> < >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics >> >> <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics> >>
Re: [DISCUSS] KIP-349 Priorities for Source Topics
Does this clarify ? -- Nick > On Aug 9, 2018, at 7:44 PM, n...@afshartous.com wrote: > > Since there are questions I changed the heading from VOTE to DISCUSS > >> On Aug 8, 2018, at 9:09 PM, Matt Farmer mailto:m...@frmr.me>> >> wrote: >> >> s it worth spelling out explicitly what the behavior is when two topics >> have the same priority? I'm a bit fuzzy on how we choose what topics to >> consume from right now, if I'm being honest, so it could be useful to >> outline the current behavior in the background and to spell out how that >> would change (or if it would change) when two topics are given the same >> priority. > > I added an additional note in the KIP’s Compatibility section to clarify that > current behavior would not change in order to preserve backwards > compatibility. > >> Also, how does this play with max.poll.records? Does the consumer read from >> all the topics in priority order until we've hit the number of records or >> the poll timeout? Or does it immediately return the high priority records >> without pulling low priority records? > > > My own interpretation would be to read from all the topics in priority order > as the consumer is subscribed to multiple topics. > -- > Nick
[VOTE] KIP-349 Priorities for Source Topics
Hi All, Calling for a vote on KIP-349 https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
I only saw one vote on KIP-349, just checking to see if anyone else would like to vote before closing this out. -- Nick > On Aug 13, 2018, at 9:19 PM, n...@afshartous.com wrote: > > > Hi All, > > Calling for a vote on KIP-349 > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics > > -- > Nick > > >
[DISCUSS] KIP-349 Priorities for Source Topics
Changing subject back to DISCUSS. > On Aug 20, 2018, at 6:52 AM, Jan Filipiak wrote: > > Just saying that we should peek at the Samza approach, it's a much more > powerful abstraction. We can ship a default MessageChooser > that looks at the topics priority. Thanks Jan for your input. Here’s an attempt at incorporating a Samza MessageChooser type interface. -- Nick New interface TopicPrioritizer allows one to create a method implementation that prioritizes topics. The topic priorities that were assigned with method KafkaConsumer.subscribe may or may not be used. The input is the list of subscribed topics, and output is ordered list of topics. The ordering represents the priority that the TopicPrioritizer implementation has assigned. Calls to KafkaConsumer.poll will use the TopicPrioritizer to determine the priority of topics. interface TopicPrioritizer { List prioritize(List topicPriorities); } New method KafkaConsumer.registerTopicPrioritizer is used to register the TopicPrioritizer public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);
Re: [DISCUSS] KIP-349 Priorities for Source Topics
Just clarifying that the API below would be in addition to the API specified in KIP-349 https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics -- Nick > On Aug 30, 2018, at 9:57 AM, n...@afshartous.com wrote: > > Here’s an attempt at incorporating a Samza MessageChooser type interface. > -- > Nick > > > New interface TopicPrioritizer allows one to create a method implementation > that prioritizes topics. The topic priorities that were assigned with method > KafkaConsumer.subscribe may or may not be used. The input is the list of > subscribed topics, and output is ordered list of topics. The ordering > represents the priority that the TopicPrioritizer implementation has > assigned. Calls to KafkaConsumer.poll will use the TopicPrioritizer to > determine the priority of topics. > > interface TopicPrioritizer { > List prioritize(List topicPriorities); > } > > > New method KafkaConsumer.registerTopicPrioritizer is used to register the > TopicPrioritizer > > public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer);
Re: [DISCUSS] KIP-349 Priorities for Source Topics
@Jan - can you comment on whether or not this is what you had in mind ? -- Nick > On Aug 30, 2018, at 10:18 AM, n...@afshartous.com wrote: > > > Just clarifying that the API below would be in addition to the API specified > in KIP-349 > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics> > -- > Nick > > >> On Aug 30, 2018, at 9:57 AM, n...@afshartous.com >> <mailto:n...@afshartous.com> wrote: >> >> Here’s an attempt at incorporating a Samza MessageChooser type interface. >> -- >> Nick >> >> >> New interface TopicPrioritizer allows one to create a method implementation >> that prioritizes topics. The topic priorities that were assigned with method >> KafkaConsumer.subscribe may or may not be used. The input is the list of >> subscribed topics, and output is ordered list of topics. The ordering >> represents the priority that the TopicPrioritizer implementation has >> assigned. Calls to KafkaConsumer.poll will use the TopicPrioritizer to >> determine the priority of topics. >> >> interface TopicPrioritizer { >> List prioritize(List topicPriorities); >> } >> >> >> New method KafkaConsumer.registerTopicPrioritizer is used to register the >> TopicPrioritizer >> >> public void registerTopicPrioritizer(TopicPrioritizer topicPrioritizer); > > > > >
Re: [DISCUSS] KIP-349 Priorities for Source Topics
> On Sep 4, 2018, at 4:20 PM, Jan Filipiak wrote: > > what I meant is litterally this interface: > > https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html > > <https://samza.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/chooser/MessageChooser.html> Hi Jan, Thanks for the reply and I have a few questions. This Samza doc https://samza.apache.org/learn/documentation/0.14/container/streams.html <https://samza.apache.org/learn/documentation/0.14/container/streams.html> indicates that the chooser is set via configuration. Are you suggesting adding a new configuration for Kafka ? Seems like we could also have a method on KafkaConsumer public void register(MessageChooser messageChooser) to make it more dynamic. Also, the Samza MessageChooser interface has method /* Notify the chooser that a new envelope is available for a processing. */ void update(IncomingMessageEnvelope envelope) and I’m wondering how this method would be translated to Kafka API. In particular what corresponds to IncomingMessageEnvelope. Best, -- Nick
Re: [DISCUSS] KIP-349 Priorities for Source Topics
Hi All, Thanks for all the comments on KIP-349. I agree with Mathias’ comment that topic prioritization and Message chooser are complimentary. And since the two API’s are not strongly coupled my inclination is not to add MessageChooser to KIP-349. MessageChoose could be done in a separate follow-on KIP (which I would be happy to work on after). I also agree with Colin’s comment that the use-cases should be clear. Hence I’d like to make a second call to reply to this thread to elaborate with any specific use-cases. After the use-cases are established I”ll then call for a vote. Cheers, -- Nick
[VOTE] KIP-349 Priorities for Source Topics
Hi All, At this point, I’d like to call for a vote on KIP-349 https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics> This is the original proposal, sans MessageChooser. Cheers, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Sep 30, 2018, at 5:16 AM, Dongjin Lee wrote: > > 1. Your KIP document > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics>> > lacks hyperlink to the discussion thread. And I couldn`t find the > discussion thread from the mailing archive. Hi Dongjin, There has been a discussion thread. I added this link as a reference https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-349 <https://lists.apache.org/list.html?dev@kafka.apache.org:lte=1M:kip-349> to the KIP-349 page https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics <https://cwiki.apache.org/confluence/display/KAFKA/KIP-349:+Priorities+for+Source+Topics> Best, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Oct 3, 2018, at 12:41 PM, Colin McCabe wrote: > > Will there be a separate code path for people who don't want to use this > feature? Yes, I tried to capture this in the KIP by indicating that this API change is 100% backwards compatible. Current consumer semantics and performance would be unaffected. Best, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Oct 5, 2018, at 2:25 PM, Colin McCabe wrote: > > t's possible for the change to be 100% backwards compatible, but still not > have a separate code path for people who don't want to use this feature, > right? What I am getting at is basically: will this feature increase > broker-side memory consumption for people who don't use it? Hi Colin, My intent is to leave current consumer semantics and performance and resource characteristics intact. Therefore there should be no additional resources necessary for existing code paths. I would expect that a PR would likely not be accepted if there was a degradation of performance for the existing code paths. Regarding use-cases, it seems that prioritized topics would be required for interleaving messages from multiple topics in order of priority. For example, let’s say each topic represents a real-time news feed (i.e. NY Times, Washington Post, Boston Globe). And if we’d like to process messages from the NY Times first as these arrive. This could be expressed easily using topic prioritization. With multiple consumers the merging and interleaving of messages would have to be done by the caller. Its certainly doable, though it seems like those who are requesting topic prioritization are asking for a more expressive consumer API. My $0.02. Best, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
> On Jan 17, 2019, at 8:49 PM, n...@afshartous.com wrote: > >> On Jan 15, 2019, at 2:26 PM, Colin McCabe > <mailto:cmcc...@apache.org>> wrote: >> >> I think it makes sense to go back to use-cases again. So far, all of the >> use-cases we discussed could be handled by pause and resume. So it makes >> sense to try to figure out what the issue with those APIs is. Are they not >> well-documented enough? Is there something higher-level we could build on >> top to make them easier to use? >> >> It would be better to wait until a user comes forward and with a case where >> priorities are needed, to implement them. Since then we would know more >> about what the API should be, etc. > > > Hi Colin, > > I agree that the use-cases are important. Rather than wait though I took > some initiative and posted the message below to the Kafka user list (Subject: > Prioritized Topics for Kafka). > Since yesterday there have been 6 replies containing 7 different use-cases > and very positive feedback. Please review. > >https://lists.apache.org/list.html?us...@kafka.apache.org > <https://lists.apache.org/list.html?us...@kafka.apache.org> > > At this point I feel like we have enough info and would like to try and work > towards a vote or set the status of the KIP to dormant. > Hi Colin, Just bumping this thread to see if you’ve had a chance to review the use-cases on the thread on the user’s list. Cheers, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
Hi Colin, > On Jan 24, 2019, at 12:14 PM, Colin McCabe wrote: > > Users almost always like the idea of new features, whatever they are. But > that doesn't mean that the feature would necessarily work well or be > necessary. Yes, though we should certainly consider the responses on the user list as input (Subject: Prioritized Topics for Kafka). > If you still want to pursue this, then I suggest gathering a set of use-cases > that can't be addressed through the means we discussed here previously. So, > something that can't effectively be addressed through using the pause and > resume API. We’ve discussed this point before. I accept you point that a user could implement this behavior with pause and resume. This KIP is about creating a higher-level API to make it easier to do so. > Then come up with a concrete proposal that addresses all the questions we > have, including about starvation, incremental fetch requests, and so on. To me it seems like there’s only one outstanding issue here (incremental fetch), and we could just pick one of the options. Starvation is by design. I’m not sure what “and so on” references. > This could be a lot of work. If you're looking for a way to make more > contributions, I'd recommend getting started with something easier. Yes it does. And after 6 months of (sometimes circular) discussion I’d like to either move towards a vote or set the status of this KIP to dormant until if and when someone else picks up it up. Does anybody else have input on either having a vote or setting the KIP dormant ? Cheers, -- Nick
Re: [VOTE] KIP-349 Priorities for Source Topics
Hi Sönke, Thanks for taking the time to review. I’ve put KIP-349 into hibernation. Thanks also to everyone who participated in the discussion. Best regards, -- Nick > On Jan 25, 2019, at 5:51 AM, Sönke Liebau > wrote: > > a bit late to the party, sorry. I recently spent some time looking > into this / a similar issue [1]. > After some investigation and playing around with settings I think that > the benefit that could be gained from this is somewhat limited and > probably outweighed by the implementation effort. > > The consumer internal are already geared towards treating partitions > fairly so that no partition has to wait an undue amount of time and > this can be further tuned for latency over throughput. Additionally, > if this is a large issue for someone, there is always the option of > having a dedicated consumer reading only from the control topic, which > would mean that messages from that topic are received "immediately". > For a Kafka Streams job it would probably make sense to create two > input streams and then merging those as a first step. > > I think with these knobs a fairly large amount of flexibility can be > achieved so that there is no urgent need to implement priorities. > > So my personal preference would be to set this KIP to dormant for now.
Re: [VOTE] KIP-349 Priorities for Source Topics
Hi Adam, This change is only intended for the basic consumer API. Cheers, -- Nick From: Adam Bellemare Sent: Sunday, January 6, 2019 11:45 AM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-349 Priorities for Source Topics Hi Nick Is this change only for the basic consumer? How would this affect anything with Kafka Streams? Thanks > On Jan 5, 2019, at 10:52 PM, n...@afshartous.com wrote: > > Bumping again for more votes. > -- > Nick > > >> On Dec 26, 2018, at 12:36 PM, n...@afshartous.com wrote: >> >> Bumping this thread for more votes >> >> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics&d=DwIFAg&c=-SicqtCl7ffNuxX6bdsSog&r=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs&m=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8&s=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0&e= >> >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics&d=DwIFAg&c=-SicqtCl7ffNuxX6bdsSog&r=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs&m=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8&s=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0&e=><https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics&d=DwIFAg&c=-SicqtCl7ffNuxX6bdsSog&r=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs&m=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8&s=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0&e= >> >> <https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D349-3A-2BPriorities-2Bfor-2BSource-2BTopics&d=DwIFAg&c=-SicqtCl7ffNuxX6bdsSog&r=P28z_ShLjFv5AP-w9-b_auYBx8qTrjk2JPYZKbjmJTs&m=5qg4fCOVMtRYYLu2e8h8KmDyis_uk3aFqT5Eq0x4hN8&s=Sbrd5XSwEZiMc9iTPJjRQafl4ubXwIOnsnFzhBEa0h0&e=>> > > > >
Request for permission to create KIP
Hi all, Requesting permission to create a KIP in regards to KAFKA-6690 Priorities for Source Topics My Wiki ID is nafshartous. Cheers, -- Nick
[DISCUSS] KIP-349 Priorities for Source Topics
Hi All, This message is to initiate discussion on a feature to add optional priorities to source topics. Please review https://cwiki.apache.org/confluence/display/KAFKA/KIP-349%3A+Priorities+for+Source+Topics Cheers, -- Nick
[GitHub] kafka pull request #4249: fix typo in ProducerConfig doc
GitHub user nick-zh opened a pull request: https://github.com/apache/kafka/pull/4249 fix typo in ProducerConfig doc You can merge this pull request into a Git repository by running: $ git pull https://github.com/nick-zh/kafka trunk Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/4249.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4249 commit d07f2d967ee4124f9bdbbf06c49ea832c78a47c9 Author: Nick Chiu Date: 2017-11-22T15:05:20Z fix typo in ProducerConfig doc ---
Fwd: Producer acks=1, clean broker shutdown and data loss
I didn't get any hits on this on users@, so I'm forwarding this to dev@. Any explanation for this would be greatly appreciated! -- Forwarded message ------ From: Nick Travers Date: Sat, Feb 18, 2017 at 5:04 PM Subject: Producer acks=1, clean broker shutdown and data loss To: us...@kafka.apache.org Hi - I'm trying to understand the expected behavior of the scenario in which I have a producer with `acks=1` (i.e. partition leader acks only) and I cleanly shut down a broker (via `KafkaServer#shutdown`). I am running my test scenario with three brokers (0.10.1.1), with a default replication count and default partition count of three. The producer (also 0.10.1.1) is writing to a single topic. When I shut down one broker, I observe that in some instances there is data loss on the topic that the producer was sending to, around the instant where the broker was shut down and the producer sees that a new leader for one of the partitions has been re-elected. Reading the documentation [0], I see the following for the `acks=1` config: This will mean the leader will write the record to its local log but will > respond without awaiting full acknowledgement from all followers. In this > case should the leader fail immediately after acknowledging the record but > before the followers have replicated it then the record will be lost. While it is clear to me that in the case of a _failure_ of this broker that messages will be lost (as they have not yet been replicated to a follower), it isn't clear to me what the expected behavior is in the case where this broker is told to _cleanly_ shut down. I understand that having a setup where `replicas=3`, `min.insync.replicas=2` and `acks=-1` is much safer (and I've verified this works as expected for the same test scenario), I'd like to understand the semantics of the `acks=1` case nonetheless. Thanks in advance. - nick [0]: https://github.com/apache/kafka/blob/0.10.1.1/clients/ src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L86
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
existing "flush" metrics, we'd have two options, which both seem pretty bad to me: 1. Have them record calls to commit(), which would be misleading, as data is no longer explicitly "flushed" to disk by this call. 2. Have them record nothing at all, which is equivalent to removing the metrics, except that users will see the metric still exists and so assume that the metric is correct, and that there's a problem with their system when there isn't. I agree that removing them is also a bad solution, and I'd like some guidance on the best path forward here. 5. Position files are updated on every write to a StateStore. Since our writes are now buffered until commit(), we can't update the Position file until commit() has been called, otherwise it would be inconsistent with the data in the event of a rollback. Consequently, we need to manage these offsets the same way we manage the checkpoint offsets, and ensure they're only written on commit(). 6. Agreed, although I'm not exactly sure yet what tests to write. How explicit do we need to be here in the KIP? As for upgrade/downgrade: upgrade is designed to be seamless, and we should definitely add some tests around that. Downgrade, it transpires, isn't currently possible, as the extra column family for offset storage is incompatible with the pre-KIP-892 implementation: when you open a RocksDB database, you must open all available column families or receive an error. What currently happens on downgrade is that it attempts to open the store, throws an error about the offsets column family not being opened, which triggers a wipe and rebuild of the Task. Given that downgrades should be uncommon, I think this is acceptable behaviour, as the end-state is consistent, even if it results in an undesirable state restore. Should I document the upgrade/downgrade behaviour explicitly in the KIP? -- Regards, Nick On Mon, 14 Aug 2023 at 22:31, Bruno Cadonna wrote: > Hi Nick! > > Thanks for the updates! > > 1. > Why does StateStore#flush() default to > StateStore#commit(Collections.emptyMap())? > Since calls to flush() will not exist anymore after this KIP is > released, I would rather throw an unsupported operation exception by > default. > > > 2. > When would a state store return -1 from > StateStore#approximateNumUncommittedBytes() while being transactional? > > Wouldn't StateStore#approximateNumUncommittedBytes() also return 0 if > the state store is transactional but nothing has been written to the > state store yet? > > > 3. > Sorry for bringing this up again. Does this KIP really need to introduce > StateStoreContext#isolationLevel()? StateStoreContext has already > appConfigs() which basically exposes the same information, i.e., if EOS > is enabled or not. > In one of your previous e-mails you wrote: > > "My idea was to try to keep the StateStore interface as loosely coupled > from the Streams engine as possible, to give implementers more freedom, > and reduce the amount of internal knowledge required." > > While I understand the intent, I doubt that it decreases the coupling of > a StateStore interface and the Streams engine. READ_COMMITTED only > applies to IQ but not to reads by processors. Thus, implementers need to > understand how Streams accesses the state stores. > > I would like to hear what others think about this. > > > 4. > Great exposing new metrics for transactional state stores! However, I > would prefer to add new metrics and deprecate (in the docs) the old > ones. You can find examples of deprecated metrics here: > https://kafka.apache.org/documentation/#selector_monitoring > > > 5. > Why does the KIP mention position files? I do not think they are related > to transactions or flushes. > > > 6. > I think we will also need to adapt/add integration tests besides unit > tests. Additionally, we probably need integration or system tests to > verify that upgrades and downgrades between transactional and > non-transactional state stores work as expected. > > > Best, > Bruno > > > > > > On 7/21/23 10:34 PM, Nick Telford wrote: > > One more thing: I noted John's suggestion in the KIP, under "Rejected > > Alternatives". I still think it's an idea worth pursuing, but I believe > > that it's out of the scope of this KIP, because it solves a different set > > of problems to this KIP, and the scope of this one has already grown > quite > > large! > > > > On Fri, 21 Jul 2023 at 21:33, Nick Telford > wrote: > > > >> Hi everyone, > >> > >> I've updated the KIP ( > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Sem
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Colt, Thanks for taking the time to run your benchmarks on this, that's incredibly helpful. > With KIP 892, I verified that unclean shutdown does not cause a fresh > restore (). I got the following benchmark results: > - Benchmark took 216 seconds > - 1,401 tasks per second on one partition > - 11 seconds to restore the state Can you clarify how much state was restored in those 11 seconds? Was this the time to do the full restore regardless, or was it the time to only restore a small fraction of the state (e.g. the last aborted transaction)? > -- QUESTION: Because we observed a significant (30% or so) and reproducible > slowdown during restoration, it seems like KIP-892 uses the checkpointing > behavior during restoration as well? If so, I would argue that this might > not be necessary, because everything we write is already committed, so we > don't need to change the behavior during restoration or standby tasks. > Perhaps we could write the offsets to RocksDB on every batch (or even every > 5 seconds or so). Restore has always used a completely separate code-path to regular writes, and continues to do so. I had a quick pass over the code and I suspect I know what's causing the performance degradation: for every restored record, I was adding the changelog offset of that record to the batch along with the record. This is different to the regular write-path, which only adds the current offsets once, on-commit. This writeOffset method is fairly expensive, since it has to serialize the TopicPartition and offset that's being written to the database. Assuming this is the cause, I've already pushed a fix to my branch that will only call writeOffset once per-batch, and also adds some caching to the serialization in writeOffset, that should also enhance performance of state commit in the normal write-path. Please let me know if this addresses the issue! Regards, Nick On Mon, 11 Sept 2023 at 05:38, Colt McNealy wrote: > Howdy folks, > > First I wanted to say fantastic work and thank you to Nick. I built your > branch (https://github.com/nicktelford/kafka/tree/KIP-892-3.5.0) and did > some testing on our Streams app with Kafka 3.5.0, your `kip-892-3.5.0` > branch, and your `kip-892-3.5.0` branch built with Speedb OSS 2.3.0.1. And > it worked! Including the global store (we don't have any segmented stores, > unfortunately). > > The test I ran involved running 3,000 workflows with 100 tasks each, and > roughly 650MB state total. > > With Streams 3.5.0, I indeed verified that unclean shutdown caused a fresh > restore from scratch. I also benchmarked my application at: > - Running the benchmark took 211 seconds > - 1,421 tasks per second on one partition > - 8 seconds to restore the state (650MB or so) > > With KIP 892, I verified that unclean shutdown does not cause a fresh > restore (). I got the following benchmark results: > - Benchmark took 216 seconds > - 1,401 tasks per second on one partition > - 11 seconds to restore the state > > I ran the restorations many times to ensure that there was no rounding > error or noise; the results were remarkably consistent. Additionally, I ran > the restorations with KIP-892 built with Speedb OSS. The restoration time > consistently came out as 10 seconds, which was an improvement from the 11 > seconds observed with RocksDB + KIP-892. > > My application is bottlenecked mostly by serialization and deserialization, > so improving the performance of the state store doesn't really impact our > throughput that much. And the processing performance (benchmark time, > tasks/second) are pretty close in KIP-892 vs Streams 3.5.0. However, at > larger state store sizes, RocksDB performance begins to degrade, so that > might not be true once we pass 20GB per partition. > > -- QUESTION: Because we observed a significant (30% or so) and reproducible > slowdown during restoration, it seems like KIP-892 uses the checkpointing > behavior during restoration as well? If so, I would argue that this might > not be necessary, because everything we write is already committed, so we > don't need to change the behavior during restoration or standby tasks. > Perhaps we could write the offsets to RocksDB on every batch (or even every > 5 seconds or so). > > -- Note: This was a very small-scale test, with <1GB of state (as I didn't > have time to spend hours building up state). In the past I have noted that > RocksDB performance degrades significantly after 25GB of state in one > store. Future work involves determining the performance impact of KIP-892 > relative to trunk at larger scale, since it's possible that the relative > behaviors are far different (i.e. relative to trunk, 892's processing and > restoration throughput might be mu
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Bruno, Thanks for getting back to me! 2. The fact that implementations can always track estimated memory usage in the wrapper is a good point. I can remove -1 as an option, and I'll clarify the JavaDoc that 0 is not just for non-transactional stores, which is currently misleading. 6. The problem with catching the exception in the downgrade process is that would require new code in the Kafka version being downgraded to. Since users could conceivably downgrade to almost *any* older version of Kafka Streams, I'm not sure how we could add that code? The only way I can think of doing it would be to provide a dedicated downgrade tool, that goes through every local store and removes the offsets column families. But that seems like an unnecessary amount of extra code to maintain just to handle a somewhat niche situation, when the alternative (automatically wipe and restore stores) should be acceptable. 1, 4, 5: Agreed. I'll make the changes you've requested. 3a. I agree that IsolationLevel makes more sense at query-time, and I actually initially attempted to place the IsolationLevel at query-time, but I ran into some problems: - The key issue is that, under ALOS we're not staging writes in transactions, so can't perform writes at the READ_COMMITTED isolation level. However, this may be addressed if we decide to *always* use transactions as discussed under 3b. - IQv1 and IQv2 have quite different implementations. I remember having some difficulty understanding the IQv1 internals, which made it difficult to determine what needed to be changed. However, I *think* this can be addressed for both implementations by wrapping the RocksDBStore in an IsolationLevel-dependent wrapper, that overrides read methods (get, etc.) to either read directly from the database or from the ongoing transaction. But IQv1 might still be difficult. - If IsolationLevel becomes a query constraint, then all other StateStores will need to respect it, including the in-memory stores. This would require us to adapt in-memory stores to stage their writes so they can be isolated from READ_COMMITTTED queries. It would also become an important consideration for third-party stores on upgrade, as without changes, they would not support READ_COMMITTED queries correctly. Ultimately, I may need some help making the necessary change to IQv1 to support this, but I don't think it's fundamentally impossible, if we want to pursue this route. 3b. The main reason I chose to keep ALOS un-transactional was to minimize behavioural change for most users (I believe most Streams users use the default configuration, which is ALOS). That said, it's clear that if ALOS also used transactional stores, the only change in behaviour would be that it would become *more correct*, which could be considered a "bug fix" by users, rather than a change they need to handle. I believe that performance using transactions (aka. RocksDB WriteBatches) should actually be *better* than the un-batched write-path that is currently used[1]. The only "performance" consideration will be the increased memory usage that transactions require. Given the mitigations for this memory that we have in place, I would expect that this is not a problem for most users. If we're happy to do so, we can make ALOS also use transactions. Regards, Nick Link 1: https://github.com/adamretter/rocksjava-write-methods-benchmark#results On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna wrote: > Hi Nick, > > Thanks for the updates and sorry for the delay on my side! > > > 1. > Making the default implementation for flush() a no-op sounds good to me. > > > 2. > I think what was bugging me here is that a third-party state store needs > to implement the state store interface. That means they need to > implement a wrapper around the actual state store as we do for RocksDB > with RocksDBStore. So, a third-party state store can always estimate the > uncommitted bytes, if it wants, because the wrapper can record the added > bytes. > One case I can think of where returning -1 makes sense is when Streams > does not need to estimate the size of the write batch and trigger > extraordinary commits, because the third-party state store takes care of > memory. But in that case the method could also just return 0. Even that > case would be better solved with a method that returns whether the state > store manages itself the memory used for uncommitted bytes or not. > Said that, I am fine with keeping the -1 return value, I was just > wondering when and if it will be used. > > Regarding returning 0 for transactional state stores when the batch is > empty, I was just wondering because you explicitly stated > > "or {@code 0} if this StateStore does not support transactions." > > So it seemed to me returning 0 could only happen for non-transactional > state
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Bruno, Thinking about 3a. in addition to adding the IsolationLevel to QueryStoreParameters and Query, what about also adding a method like "ReadOnlyKeyValueStore view(IsolationLevel isolationLevel)" to ReadOnlyKeyValueStore? This would enable us to easily select/switch between IsolationLevels, even if the StateStore has many layers of wrappers (as is the case at the point IQv1 deals with the store). Would this be acceptable, or do you have another approach in mind? Regards, Nick On Wed, 13 Sept 2023 at 10:57, Nick Telford wrote: > Hi Bruno, > > Thanks for getting back to me! > > 2. > The fact that implementations can always track estimated memory usage in > the wrapper is a good point. I can remove -1 as an option, and I'll clarify > the JavaDoc that 0 is not just for non-transactional stores, which is > currently misleading. > > 6. > The problem with catching the exception in the downgrade process is that > would require new code in the Kafka version being downgraded to. Since > users could conceivably downgrade to almost *any* older version of Kafka > Streams, I'm not sure how we could add that code? > The only way I can think of doing it would be to provide a dedicated > downgrade tool, that goes through every local store and removes the > offsets column families. But that seems like an unnecessary amount of extra > code to maintain just to handle a somewhat niche situation, when the > alternative (automatically wipe and restore stores) should be acceptable. > > 1, 4, 5: Agreed. I'll make the changes you've requested. > > 3a. > I agree that IsolationLevel makes more sense at query-time, and I actually > initially attempted to place the IsolationLevel at query-time, but I ran > into some problems: > - The key issue is that, under ALOS we're not staging writes in > transactions, so can't perform writes at the READ_COMMITTED isolation > level. However, this may be addressed if we decide to *always* use > transactions as discussed under 3b. > - IQv1 and IQv2 have quite different implementations. I remember having > some difficulty understanding the IQv1 internals, which made it difficult > to determine what needed to be changed. However, I *think* this can be > addressed for both implementations by wrapping the RocksDBStore in an > IsolationLevel-dependent wrapper, that overrides read methods (get, etc.) > to either read directly from the database or from the ongoing transaction. > But IQv1 might still be difficult. > - If IsolationLevel becomes a query constraint, then all other StateStores > will need to respect it, including the in-memory stores. This would require > us to adapt in-memory stores to stage their writes so they can be isolated > from READ_COMMITTTED queries. It would also become an important > consideration for third-party stores on upgrade, as without changes, they > would not support READ_COMMITTED queries correctly. > > Ultimately, I may need some help making the necessary change to IQv1 to > support this, but I don't think it's fundamentally impossible, if we want > to pursue this route. > > 3b. > The main reason I chose to keep ALOS un-transactional was to minimize > behavioural change for most users (I believe most Streams users use the > default configuration, which is ALOS). That said, it's clear that if ALOS > also used transactional stores, the only change in behaviour would be that > it would become *more correct*, which could be considered a "bug fix" by > users, rather than a change they need to handle. > > I believe that performance using transactions (aka. RocksDB WriteBatches) > should actually be *better* than the un-batched write-path that is > currently used[1]. The only "performance" consideration will be the > increased memory usage that transactions require. Given the mitigations for > this memory that we have in place, I would expect that this is not a > problem for most users. > > If we're happy to do so, we can make ALOS also use transactions. > > Regards, > Nick > > Link 1: > https://github.com/adamretter/rocksjava-write-methods-benchmark#results > > On Wed, 13 Sept 2023 at 09:41, Bruno Cadonna wrote: > >> Hi Nick, >> >> Thanks for the updates and sorry for the delay on my side! >> >> >> 1. >> Making the default implementation for flush() a no-op sounds good to me. >> >> >> 2. >> I think what was bugging me here is that a third-party state store needs >> to implement the state store interface. That means they need to >> implement a wrapper around the actual state store as we do for RocksDB >> with RocksDBStore. So, a third-party state store can always estimate the >> u
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Bruno, I've updated the KIP based on our conversation. The only things I've not yet done are: 1. Using transactions under ALOS and EOS. 2. Making IsolationLevel a query-time constraint, rather than linking it to the processing.guarantee. There's a wrinkle that makes this a challenge: Interactive Queries that open an Iterator, when using transactions and READ_UNCOMMITTED. The problem is that under READ_UNCOMMITTED, queries need to be able to read records from the currently uncommitted transaction buffer (WriteBatch). This includes for Iterators, which should iterate both the transaction buffer and underlying database (using WriteBatch#iteratorWithBase()). The issue is that when the StreamThread commits, it writes the current WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the WriteBatch while an Interactive Query holds an open Iterator on it will invalidate the Iterator. Worse, it turns out that Iterators over a WriteBatch become invalidated not just when the WriteBatch is cleared, but also when the Iterators' current key receives a new write. Now that I'm writing this, I remember that this is the major reason that I switched the original design from having a query-time IsolationLevel to having the IsolationLevel linked to the transactionality of the stores themselves. It *might* be possible to resolve this, by having a "chain" of WriteBatches, with the StreamThread switching to a new WriteBatch whenever a new Interactive Query attempts to read from the database, but that could cause some performance problems/memory pressure when subjected to a high Interactive Query load. It would also reduce the efficiency of WriteBatches on-commit, as we'd have to write N WriteBatches, where N is the number of Interactive Queries since the last commit. I realise this is getting into the weeds of the implementation, and you'd rather we focus on the API for now, but I think it's important to consider how to implement the desired API, in case we come up with an API that cannot be implemented efficiently, or even at all! Thoughts? -- Nick On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna wrote: > Hi Nick, > > 6. > Of course, you are right! My bad! > Wiping out the state in the downgrading case is fine. > > > 3a. > Focus on the public facing changes for the KIP. We will manage to get > the internals right. Regarding state stores that do not support > READ_COMMITTED, they should throw an error stating that they do not > support READ_COMMITTED. No need to adapt all state stores immediately. > > 3b. > I am in favor of using transactions also for ALOS. > > > Best, > Bruno > > On 9/13/23 11:57 AM, Nick Telford wrote: > > Hi Bruno, > > > > Thanks for getting back to me! > > > > 2. > > The fact that implementations can always track estimated memory usage in > > the wrapper is a good point. I can remove -1 as an option, and I'll > clarify > > the JavaDoc that 0 is not just for non-transactional stores, which is > > currently misleading. > > > > 6. > > The problem with catching the exception in the downgrade process is that > > would require new code in the Kafka version being downgraded to. Since > > users could conceivably downgrade to almost *any* older version of Kafka > > Streams, I'm not sure how we could add that code? > > The only way I can think of doing it would be to provide a dedicated > > downgrade tool, that goes through every local store and removes the > > offsets column families. But that seems like an unnecessary amount of > extra > > code to maintain just to handle a somewhat niche situation, when the > > alternative (automatically wipe and restore stores) should be acceptable. > > > > 1, 4, 5: Agreed. I'll make the changes you've requested. > > > > 3a. > > I agree that IsolationLevel makes more sense at query-time, and I > actually > > initially attempted to place the IsolationLevel at query-time, but I ran > > into some problems: > > - The key issue is that, under ALOS we're not staging writes in > > transactions, so can't perform writes at the READ_COMMITTED isolation > > level. However, this may be addressed if we decide to *always* use > > transactions as discussed under 3b. > > - IQv1 and IQv2 have quite different implementations. I remember having > > some difficulty understanding the IQv1 internals, which made it difficult > > to determine what needed to be changed. However, I *think* this can be > > addressed for both implementations by wrapping the RocksDBStore in an > > IsolationLevel-dependent wrapper, that overrides read methods (get, etc.) > > to either read directly from the database or from the
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Addendum: I think we would also face the same problem with the approach John outlined earlier (using the record cache as a transaction buffer and flushing it straight to SST files). This is because the record cache (the ThreadCache class) is not thread-safe, so every commit would invalidate open IQ Iterators in the same way that RocksDB WriteBatches do. -- Nick On Wed, 13 Sept 2023 at 16:58, Nick Telford wrote: > Hi Bruno, > > I've updated the KIP based on our conversation. The only things I've not > yet done are: > > 1. Using transactions under ALOS and EOS. > 2. Making IsolationLevel a query-time constraint, rather than linking it > to the processing.guarantee. > > There's a wrinkle that makes this a challenge: Interactive Queries that > open an Iterator, when using transactions and READ_UNCOMMITTED. > The problem is that under READ_UNCOMMITTED, queries need to be able to > read records from the currently uncommitted transaction buffer > (WriteBatch). This includes for Iterators, which should iterate both the > transaction buffer and underlying database (using > WriteBatch#iteratorWithBase()). > > The issue is that when the StreamThread commits, it writes the current > WriteBatch to RocksDB *and then clears the WriteBatch*. Clearing the > WriteBatch while an Interactive Query holds an open Iterator on it will > invalidate the Iterator. Worse, it turns out that Iterators over a > WriteBatch become invalidated not just when the WriteBatch is cleared, but > also when the Iterators' current key receives a new write. > > Now that I'm writing this, I remember that this is the major reason that I > switched the original design from having a query-time IsolationLevel to > having the IsolationLevel linked to the transactionality of the stores > themselves. > > It *might* be possible to resolve this, by having a "chain" of > WriteBatches, with the StreamThread switching to a new WriteBatch whenever > a new Interactive Query attempts to read from the database, but that could > cause some performance problems/memory pressure when subjected to a high > Interactive Query load. It would also reduce the efficiency of WriteBatches > on-commit, as we'd have to write N WriteBatches, where N is the number of > Interactive Queries since the last commit. > > I realise this is getting into the weeds of the implementation, and you'd > rather we focus on the API for now, but I think it's important to consider > how to implement the desired API, in case we come up with an API that > cannot be implemented efficiently, or even at all! > > Thoughts? > -- > Nick > > On Wed, 13 Sept 2023 at 13:03, Bruno Cadonna wrote: > >> Hi Nick, >> >> 6. >> Of course, you are right! My bad! >> Wiping out the state in the downgrading case is fine. >> >> >> 3a. >> Focus on the public facing changes for the KIP. We will manage to get >> the internals right. Regarding state stores that do not support >> READ_COMMITTED, they should throw an error stating that they do not >> support READ_COMMITTED. No need to adapt all state stores immediately. >> >> 3b. >> I am in favor of using transactions also for ALOS. >> >> >> Best, >> Bruno >> >> On 9/13/23 11:57 AM, Nick Telford wrote: >> > Hi Bruno, >> > >> > Thanks for getting back to me! >> > >> > 2. >> > The fact that implementations can always track estimated memory usage in >> > the wrapper is a good point. I can remove -1 as an option, and I'll >> clarify >> > the JavaDoc that 0 is not just for non-transactional stores, which is >> > currently misleading. >> > >> > 6. >> > The problem with catching the exception in the downgrade process is that >> > would require new code in the Kafka version being downgraded to. Since >> > users could conceivably downgrade to almost *any* older version of Kafka >> > Streams, I'm not sure how we could add that code? >> > The only way I can think of doing it would be to provide a dedicated >> > downgrade tool, that goes through every local store and removes the >> > offsets column families. But that seems like an unnecessary amount of >> extra >> > code to maintain just to handle a somewhat niche situation, when the >> > alternative (automatically wipe and restore stores) should be >> acceptable. >> > >> > 1, 4, 5: Agreed. I'll make the changes you've requested. >> > >> > 3a. >> > I agree that IsolationLevel makes more sense at query-time, and I >> actually >> > initially attem
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi everyone, I agree that having IsolationLevel be determined at query-time is the ideal design, but there are a few sticking points: 1. There needs to be some way to communicate the IsolationLevel down to the RocksDBStore itself, so that the query can respect it. Since stores are "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, etc.), we need some way to deliver that information to the bottom layer. For IQv2, we can use the existing State#query() method, but IQv1 has no way to do this. A simple approach, which would potentially open up other options, would be to add something like: ReadOnlyKeyValueStore readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore (and similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). 2. As mentioned above, RocksDB WriteBatches are not thread-safe, which causes a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a look at RocksDB Transactions[1], but they solve a very different problem, and have the same thread-safety issue. One possible approach that I mentioned is chaining WriteBatches: every time a new Interactive Query is received (i.e. readOnlyView, see above, is called) we "freeze" the existing WriteBatch, and start a new one for new writes. The Interactive Query queries the "chain" of previous WriteBatches + the underlying database; while the StreamThread starts writing to the *new* WriteBatch. On-commit, the StreamThread would write *all* WriteBatches in the chain to the database (that have not yet been written). WriteBatches would be closed/freed only when they have been both committed, and all open Interactive Queries on them have been closed. This would require some reference counting. Obviously a drawback of this approach is the potential for increased memory usage: if an Interactive Query is long-lived, for example by doing a full scan over a large database, or even just pausing in the middle of an iteration, then the existing chain of WriteBatches could be kept around for a long time, potentially forever. -- A. Going off on a tangent, it looks like in addition to supporting READ_COMMITTED queries, we could go further and support REPEATABLE_READ queries (i.e. where subsequent reads to the same key in the same Interactive Query are guaranteed to yield the same value) by making use of RocksDB Snapshots[2]. These are fairly lightweight, so the performance impact is likely to be negligible, but they do require that the Interactive Query session can be explicitly closed. This could be achieved if we made the above readOnlyView interface look more like: interface ReadOnlyKeyValueView implements ReadOnlyKeyValueStore, AutoCloseable {} interface ReadOnlyKeyValueStore { ... ReadOnlyKeyValueView readOnlyView(IsolationLevel isolationLevel); } But this would be a breaking change, as existing IQv1 queries are guaranteed to never call store.close(), and therefore these would leak memory under REPEATABLE_READ. B. One thing that's notable: MyRocks states that they support READ_COMMITTED and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4]. This could be because doing so is technically difficult/impossible using the primitives available in RocksDB. -- Lucas, to address your points: U1. It's only "SHOULD" to permit alternative (i.e. non-RocksDB) implementations of StateStore that do not support atomic writes. Obviously in those cases, the guarantees Kafka Streams provides/expects would be relaxed. Do you think we should require all implementations to support atomic writes? U2. Stores can support multiple IsolationLevels. As we've discussed above, the ideal scenario would be to specify the IsolationLevel at query-time. Failing that, I think the second-best approach is to define the IsolationLevel for *all* queries based on the processing.mode, which is what the default StateStoreContext#isolationLevel() achieves. Would you prefer an alternative? While the existing implementation is equivalent to READ_UNCOMMITTED, this can yield unexpected results/errors under EOS, if a transaction is rolled back. While this would be a change in behaviour for users, it would look more like a bug fix than a breaking change. That said, we *could* make it configurable, and default to the existing behaviour (READ_UNCOMMITTED) instead of inferring it from the processing.mode? N1, N2. These were only primitives to avoid boxing costs, but since this is not a performance sensitive area, it should be fine to change if that's desirable. N3. It's because the store "manages its own offsets", which includes both committing the offset, *and providing it* via getCommittedOffset(). Personally, I think "managesOffsets" conveys this best, but I don't mind changing it if the nomenclature is unclear. Sorry for the massive emails/essays! -- Nick 1: https://github.com/facebook/rocksdb/wiki/Transactions 2: https://github.co
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Oh! One other concern I haven't mentioned: if we make IsolationLevel a query-time constraint, then we need to add support for READ_COMMITTED to InMemoryKeyValueStore too, which will require some changes to the implementation. On Mon, 18 Sept 2023 at 17:24, Nick Telford wrote: > Hi everyone, > > I agree that having IsolationLevel be determined at query-time is the > ideal design, but there are a few sticking points: > > 1. > There needs to be some way to communicate the IsolationLevel down to the > RocksDBStore itself, so that the query can respect it. Since stores are > "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, etc.), > we need some way to deliver that information to the bottom layer. For IQv2, > we can use the existing State#query() method, but IQv1 has no way to do > this. > > A simple approach, which would potentially open up other options, would be > to add something like: ReadOnlyKeyValueStore > readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore (and > similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). > > 2. > As mentioned above, RocksDB WriteBatches are not thread-safe, which causes > a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a > look at RocksDB Transactions[1], but they solve a very different problem, > and have the same thread-safety issue. > > One possible approach that I mentioned is chaining WriteBatches: every > time a new Interactive Query is received (i.e. readOnlyView, see above, > is called) we "freeze" the existing WriteBatch, and start a new one for new > writes. The Interactive Query queries the "chain" of previous WriteBatches > + the underlying database; while the StreamThread starts writing to the > *new* WriteBatch. On-commit, the StreamThread would write *all* > WriteBatches in the chain to the database (that have not yet been written). > > WriteBatches would be closed/freed only when they have been both > committed, and all open Interactive Queries on them have been closed. This > would require some reference counting. > > Obviously a drawback of this approach is the potential for increased > memory usage: if an Interactive Query is long-lived, for example by doing a > full scan over a large database, or even just pausing in the middle of an > iteration, then the existing chain of WriteBatches could be kept around for > a long time, potentially forever. > > -- > > A. > Going off on a tangent, it looks like in addition to supporting > READ_COMMITTED queries, we could go further and support REPEATABLE_READ > queries (i.e. where subsequent reads to the same key in the same > Interactive Query are guaranteed to yield the same value) by making use of > RocksDB Snapshots[2]. These are fairly lightweight, so the performance > impact is likely to be negligible, but they do require that the Interactive > Query session can be explicitly closed. > > This could be achieved if we made the above readOnlyView interface look > more like: > > interface ReadOnlyKeyValueView implements ReadOnlyKeyValueStore V>, AutoCloseable {} > > interface ReadOnlyKeyValueStore { > ... > ReadOnlyKeyValueView readOnlyView(IsolationLevel isolationLevel); > } > > But this would be a breaking change, as existing IQv1 queries are > guaranteed to never call store.close(), and therefore these would leak > memory under REPEATABLE_READ. > > B. > One thing that's notable: MyRocks states that they support READ_COMMITTED > and REPEATABLE_READ, but they make no mention of READ_UNCOMMITTED[3][4]. > This could be because doing so is technically difficult/impossible using > the primitives available in RocksDB. > > -- > > Lucas, to address your points: > > U1. > It's only "SHOULD" to permit alternative (i.e. non-RocksDB) > implementations of StateStore that do not support atomic writes. Obviously > in those cases, the guarantees Kafka Streams provides/expects would be > relaxed. Do you think we should require all implementations to support > atomic writes? > > U2. > Stores can support multiple IsolationLevels. As we've discussed above, the > ideal scenario would be to specify the IsolationLevel at query-time. > Failing that, I think the second-best approach is to define the > IsolationLevel for *all* queries based on the processing.mode, which is > what the default StateStoreContext#isolationLevel() achieves. Would you > prefer an alternative? > > While the existing implementation is equivalent to READ_UNCOMMITTED, this > can yield unexpected results/errors under EOS, if a transaction is rolled > back. While this would be a change in behaviour for users, it would look > more like a bug fix t
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Bruno, Agreed, I can live with that for now. In an effort to keep the scope of this KIP from expanding, I'm leaning towards just providing a configurable default.state.isolation.level and removing IsolationLevel from the StateStoreContext. This would be compatible with adding support for query-time IsolationLevels in the future, whilst providing a way for users to select an isolation level now. The big problem with this, however, is that if a user selects processing.mode = "exactly-once(-v2|-beta)", and default.state.isolation.level = "READ_UNCOMMITTED", we need to guarantee that the data isn't written to disk until commit() is called, but we also need to permit IQ threads to read from the ongoing transaction. A simple solution would be to (temporarily) forbid this combination of configuration, and have default.state.isolation.level automatically switch to READ_COMMITTED when processing.mode is anything other than at-least-once. Do you think this would be acceptable? In a later KIP, we can add support for query-time isolation levels and solve this particular problem there, which would relax this restriction. Regards, Nick On Tue, 19 Sept 2023 at 09:30, Bruno Cadonna wrote: > Why do we need to add READ_COMMITTED to InMemoryKeyValueStore? I think > it is perfectly valid to say InMemoryKeyValueStore do not support > READ_COMMITTED for now, since READ_UNCOMMITTED is the de-facto default > at the moment. > > Best, > Bruno > > On 9/18/23 7:12 PM, Nick Telford wrote: > > Oh! One other concern I haven't mentioned: if we make IsolationLevel a > > query-time constraint, then we need to add support for READ_COMMITTED to > > InMemoryKeyValueStore too, which will require some changes to the > > implementation. > > > > On Mon, 18 Sept 2023 at 17:24, Nick Telford > wrote: > > > >> Hi everyone, > >> > >> I agree that having IsolationLevel be determined at query-time is the > >> ideal design, but there are a few sticking points: > >> > >> 1. > >> There needs to be some way to communicate the IsolationLevel down to the > >> RocksDBStore itself, so that the query can respect it. Since stores are > >> "layered" in functionality (i.e. ChangeLoggingStore, MeteredStore, > etc.), > >> we need some way to deliver that information to the bottom layer. For > IQv2, > >> we can use the existing State#query() method, but IQv1 has no way to do > >> this. > >> > >> A simple approach, which would potentially open up other options, would > be > >> to add something like: ReadOnlyKeyValueStore > >> readOnlyView(IsolationLevel isolationLevel) to ReadOnlyKeyValueStore > (and > >> similar to ReadOnlyWindowStore, ReadOnlySessionStore, etc.). > >> > >> 2. > >> As mentioned above, RocksDB WriteBatches are not thread-safe, which > causes > >> a problem if we want to provide READ_UNCOMMITTED Iterators. I also had a > >> look at RocksDB Transactions[1], but they solve a very different > problem, > >> and have the same thread-safety issue. > >> > >> One possible approach that I mentioned is chaining WriteBatches: every > >> time a new Interactive Query is received (i.e. readOnlyView, see above, > >> is called) we "freeze" the existing WriteBatch, and start a new one for > new > >> writes. The Interactive Query queries the "chain" of previous > WriteBatches > >> + the underlying database; while the StreamThread starts writing to the > >> *new* WriteBatch. On-commit, the StreamThread would write *all* > >> WriteBatches in the chain to the database (that have not yet been > written). > >> > >> WriteBatches would be closed/freed only when they have been both > >> committed, and all open Interactive Queries on them have been closed. > This > >> would require some reference counting. > >> > >> Obviously a drawback of this approach is the potential for increased > >> memory usage: if an Interactive Query is long-lived, for example by > doing a > >> full scan over a large database, or even just pausing in the middle of > an > >> iteration, then the existing chain of WriteBatches could be kept around > for > >> a long time, potentially forever. > >> > >> -- > >> > >> A. > >> Going off on a tangent, it looks like in addition to supporting > >> READ_COMMITTED queries, we could go further and support REPEATABLE_READ > >> queries (i.e. where subsequent reads to the same key in the same > >> Interactive Query are guaranteed to yield the same v
[DISCUSS] KIP-989: RocksDB Iterator Metrics
Hi everyone, KIP-989 is a small Kafka Streams KIP to add a few new metrics around the creation and use of RocksDB Iterators, to aid users in identifying "Iterator leaks" that could cause applications to leak native memory. Let me know what you think! https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics P.S. I'm not too sure about the formatting of the "New Metrics" table, any advice there would be appreciated. Regards, Nick
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Hi Colt, I kept the details out of the KIP, because KIPs generally document high-level design, but the way I'm doing it is like this: final ManagedKeyValueIterator rocksDbPrefixSeekIterator = cf.prefixScan(accessor, prefixBytes); --> final long startedAt = System.nanoTime(); openIterators.add(rocksDbPrefixSeekIterator); rocksDbPrefixSeekIterator.onClose(() -> { --> metricsRecorder.recordIteratorDuration(System.nanoTime() - startedAt); openIterators.remove(rocksDbPrefixSeekIterator); }); The lines with the arrow are the new code. This pattern is repeated throughout RocksDBStore, wherever a new RocksDbIterator is created. Regards, Nick On Thu, 5 Oct 2023 at 12:32, Colt McNealy wrote: > Thank you for the KIP, Nick! > > This would be highly useful for many reasons. Much more sane than checking > for leaked iterators by profiling memory usage while running 100's of > thousands of range scans via interactive queries (: > > One small question: > > >The iterator-duration metrics will be updated whenever an Iterator's > close() method is called > > Does the Iterator have its own "createdAt()" or equivalent field, or do we > need to keep track of the Iterator's start time upon creation? > > Cheers, > Colt McNealy > > *Founder, LittleHorse.dev* > > > On Wed, Oct 4, 2023 at 9:07 AM Nick Telford > wrote: > > > Hi everyone, > > > > KIP-989 is a small Kafka Streams KIP to add a few new metrics around the > > creation and use of RocksDB Iterators, to aid users in identifying > > "Iterator leaks" that could cause applications to leak native memory. > > > > Let me know what you think! > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics > > > > P.S. I'm not too sure about the formatting of the "New Metrics" table, > any > > advice there would be appreciated. > > > > Regards, > > Nick > > >
[DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
Hi everyone, This is a Streams KIP to add a new DeserializationHandlerResponse, "SUSPEND", that suspends the failing Task but continues to process other Tasks normally. https://cwiki.apache.org/confluence/display/KAFKA/KIP-990%3A+Capability+to+SUSPEND+Tasks+on+DeserializationException I'm not yet completely convinced that this is practical, as I suspect it might be abusing the SUSPENDED Task.State for something it was not designed for. The intent is to pause an active Task *without* re-assigning it to another instance, which causes cascading failures when the FAIL DeserializationHandlerResponse is used. Let me know what you think! Regards, Nick
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
t is indeed possible to make READ_UNCOMMITTED work under EOS. In the interests of trying to get this KIP over the line ASAP, I settled on adding the restriction that READ_UNCOMMITTED would be unavailable under EOS, with the goal of relaxing this in a future KIP. If it turns out that this restriction is a blocker, then I'll try to find the time to explore the possibility of adding a flag. Regards, Nick On Thu, 12 Oct 2023 at 21:32, Sophie Blee-Goldman wrote: > Hey Nick! First of all thanks for taking up this awesome feature, I'm sure > every single > Kafka Streams user and dev would agree that it is sorely needed. > > I've just been catching up on the KIP and surrounding discussion, so please > forgive me > for any misunderstandings or misinterpretations of the current plan and > don't hesitate to > correct me. > > Before I jump in, I just want to say that having seen this drag on for so > long, my singular > goal in responding is to help this KIP past a perceived impasse so we can > finally move on > to voting and implementing it. Long discussions are to be expected for > major features like > this but it's completely on us as the Streams devs to make sure there is an > end in sight > for any ongoing discussion. > > With that said, it's my understanding that the KIP as currently proposed is > just not tenable > for Kafka Streams, and would prevent some EOS users from upgrading to the > version it > first appears in. Given that we can't predict or guarantee whether any of > the followup KIPs > would be completed in the same release cycle as this one, we need to make > sure that the > feature is either compatible with all current users or else feature-flagged > so that they may > opt in/out. > > Therefore, IIUC we need to have either (or both) of these as > fully-implemented config options: > 1. default.state.isolation.level > 2. enable.transactional.state.stores > > This way EOS users for whom read_committed semantics are not viable can > still upgrade, > and either use the isolation.level config to leverage the new txn state > stores without sacrificing > their application semantics, or else simply keep the transactional state > stores disabled until we > are able to fully implement the isolation level configuration at either an > application or query level. > > Frankly you are the expert here and know much more about the tradeoffs in > both semantics and > effort level of implementing one of these configs vs the other. In my > opinion, either option would > be fine and I would leave the decision of which one to include in this KIP > completely up to you. > I just don't see a way for the KIP to proceed without some variation of the > above that would allow > EOS users to opt-out of read_committed. > > (If it's all the same to you, I would recommend always including a feature > flag in large structural > changes like this. No matter how much I trust someone or myself to > implement a feature, you just > never know what kind of bugs might slip in, especially with the very first > iteration that gets released. > So personally, my choice would be to add the feature flag and leave it off > by default. If all goes well > you can do a quick KIP to enable it by default as soon as the > isolation.level config has been > completed. But feel free to just pick whichever option is easiest or > quickest for you to implement) > > Hope this helps move the discussion forward, > Sophie > > On Tue, Sep 19, 2023 at 1:57 AM Nick Telford > wrote: > > > Hi Bruno, > > > > Agreed, I can live with that for now. > > > > In an effort to keep the scope of this KIP from expanding, I'm leaning > > towards just providing a configurable default.state.isolation.level and > > removing IsolationLevel from the StateStoreContext. This would be > > compatible with adding support for query-time IsolationLevels in the > > future, whilst providing a way for users to select an isolation level > now. > > > > The big problem with this, however, is that if a user selects > > processing.mode > > = "exactly-once(-v2|-beta)", and default.state.isolation.level = > > "READ_UNCOMMITTED", we need to guarantee that the data isn't written to > > disk until commit() is called, but we also need to permit IQ threads to > > read from the ongoing transaction. > > > > A simple solution would be to (temporarily) forbid this combination of > > configuration, and have default.state.isolation.level automatically > switch > > to READ_COMMITTED when processing.mode is anything other than > > at-least-once. Do you think this would be acc
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Bruno, Thanks for getting back to me. 1. I think this should be possible. Are you thinking of the situation where a user may downgrade to a previous version of Kafka Streams? In that case, sadly, the RocksDBStore would get wiped by the older version of Kafka Streams anyway, because that version wouldn't understand the extra column family (that holds offsets), so the missing Position file would automatically get rebuilt when the store is rebuilt from the changelog. Are there other situations than downgrade where a transactional store could be replaced by a non-transactional one? I can't think of any. 2. Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to be fleshed out. I'll work on that. How much detail do you need? 3. See my previous email discussing this. 4. Hmm, this is an interesting point. Are you suggesting that under ALOS READ_COMMITTED should not be supported? Regards, Nick On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna wrote: > Hi Nick, > > I think the KIP is converging! > > > 1. > I am wondering whether it makes sense to write the position file during > close as we do for the checkpoint file, so that in case the state store > is replaced with a non-transactional state store the non-transactional > state store finds the position file. I think, this is not strictly > needed, but would be a nice behavior instead of just deleting the > position file. > > > 2. > The test plan does not mention integration tests. Do you not need to > extend existing ones and add new ones. Also for upgrading and > downgrading you might need integration and/or system tests. > > > 3. > I think Sophie made a point. Although, IQ reading from uncommitted data > under EOS might be considered a bug by some people. Thus, your KIP would > fix a bug rather than changing the intended behavior. However, I also > see that a feature flag would help users that rely on this buggy > behavior (at least until AK 4.0). > > > 4. > This is related to the previous point. I assume that the difference > between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the > former you enable transactions on the state store and in the latter you > disable them. If my assumption is correct, I think that is an issue. > Let's assume under ALOS Streams fails over a couple of times more or > less at the same step in processing after value 3 is added to an > aggregation but the offset of the corresponding input record was not > committed. Without transactions disabled, the aggregation value would > increase by 3 for each failover. With transactions enabled, value 3 > would only be added to the aggregation once when the offset of the input > record is committed and the transaction finally completes. So the > content of the state store would change depending on the configuration > for IQ. IMO, the content of the state store should be independent from > IQ. Given this issue, I propose to not use transactions with ALOS at > all. I was a big proponent of using transactions with ALOS, but I > realized that transactions with ALOS is not as easy as enabling > transactions on state stores. Another aspect that is problematic is that > the changelog topic which actually replicates the state store is not > transactional under ALOS. Thus, it might happen that the state store and > the changelog differ in their content. All of this is maybe solvable > somehow, but for the sake of this KIP, I would leave it for the future. > > > Best, > Bruno > > > > On 10/12/23 10:32 PM, Sophie Blee-Goldman wrote: > > Hey Nick! First of all thanks for taking up this awesome feature, I'm > sure > > every single > > Kafka Streams user and dev would agree that it is sorely needed. > > > > I've just been catching up on the KIP and surrounding discussion, so > please > > forgive me > > for any misunderstandings or misinterpretations of the current plan and > > don't hesitate to > > correct me. > > > > Before I jump in, I just want to say that having seen this drag on for so > > long, my singular > > goal in responding is to help this KIP past a perceived impasse so we can > > finally move on > > to voting and implementing it. Long discussions are to be expected for > > major features like > > this but it's completely on us as the Streams devs to make sure there is > an > > end in sight > > for any ongoing discussion. > > > > With that said, it's my understanding that the KIP as currently proposed > is > > just not tenable > > for Kafka Streams, and would prevent some EOS users from upgrading to the > > version it > > first appears in. Given that we can't
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Bruno, 4. I'll hold off on making that change until we have a consensus as to what configuration to use to control all of this, as it'll be affected by the decision on EOS isolation levels. 5. Done. I've chosen "committedOffsets". Regards, Nick On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna wrote: > Hi Nick, > > 1. > Yeah, you are probably right that it does not make too much sense. > Thanks for the clarification! > > > 4. > Yes, sorry for the back and forth, but I think for the sake of the KIP > it is better to let the ALOS behavior as it is for now due to the > possible issues you would run into. Maybe we can find a solution in the > future. Now the question returns to whether we really need > default.state.isolation.level. Maybe the config could be the feature > flag Sophie requested. > > > 5. > There is a guideline in Kafka not to use the get prefix for getters (at > least in the public API). Thus, could you please rename > > getCommittedOffset(TopicPartition partition) -> > committedOffsetFor(TopicPartition partition) > > You can also propose an alternative to committedOffsetFor(). > > > Best, > Bruno > > > On 10/13/23 3:21 PM, Nick Telford wrote: > > Hi Bruno, > > > > Thanks for getting back to me. > > > > 1. > > I think this should be possible. Are you thinking of the situation where > a > > user may downgrade to a previous version of Kafka Streams? In that case, > > sadly, the RocksDBStore would get wiped by the older version of Kafka > > Streams anyway, because that version wouldn't understand the extra column > > family (that holds offsets), so the missing Position file would > > automatically get rebuilt when the store is rebuilt from the changelog. > > Are there other situations than downgrade where a transactional store > could > > be replaced by a non-transactional one? I can't think of any. > > > > 2. > > Ahh yes, the Test Plan - my Kryptonite! This section definitely needs to > be > > fleshed out. I'll work on that. How much detail do you need? > > > > 3. > > See my previous email discussing this. > > > > 4. > > Hmm, this is an interesting point. Are you suggesting that under ALOS > > READ_COMMITTED should not be supported? > > > > Regards, > > Nick > > > > On Fri, 13 Oct 2023 at 13:52, Bruno Cadonna wrote: > > > >> Hi Nick, > >> > >> I think the KIP is converging! > >> > >> > >> 1. > >> I am wondering whether it makes sense to write the position file during > >> close as we do for the checkpoint file, so that in case the state store > >> is replaced with a non-transactional state store the non-transactional > >> state store finds the position file. I think, this is not strictly > >> needed, but would be a nice behavior instead of just deleting the > >> position file. > >> > >> > >> 2. > >> The test plan does not mention integration tests. Do you not need to > >> extend existing ones and add new ones. Also for upgrading and > >> downgrading you might need integration and/or system tests. > >> > >> > >> 3. > >> I think Sophie made a point. Although, IQ reading from uncommitted data > >> under EOS might be considered a bug by some people. Thus, your KIP would > >> fix a bug rather than changing the intended behavior. However, I also > >> see that a feature flag would help users that rely on this buggy > >> behavior (at least until AK 4.0). > >> > >> > >> 4. > >> This is related to the previous point. I assume that the difference > >> between READ_COMMITTED and READ_UNCOMMITTED for ALOS is that in the > >> former you enable transactions on the state store and in the latter you > >> disable them. If my assumption is correct, I think that is an issue. > >> Let's assume under ALOS Streams fails over a couple of times more or > >> less at the same step in processing after value 3 is added to an > >> aggregation but the offset of the corresponding input record was not > >> committed. Without transactions disabled, the aggregation value would > >> increase by 3 for each failover. With transactions enabled, value 3 > >> would only be added to the aggregation once when the offset of the input > >> record is committed and the transaction finally completes. So the > >> content of the state store would change depending on the configuration > >> for IQ. IMO, the content of the state store should be ind
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Guozhang, The KIP as it stands introduces a new configuration, default.state.isolation.level, which is independent of processing.mode. It's intended that this new configuration be used to configure a global IQ isolation level in the short term, with a future KIP introducing the capability to change the isolation level on a per-query basis, falling back to the "default" defined by this config. That's why I called it "default", for future-proofing. However, it currently includes the caveat that READ_UNCOMMITTED is not available under EOS. I think this is the coupling you are alluding to? This isn't intended to be a restriction of the API, but is currently a technical limitation. However, after discussing with some users about use-cases that would require READ_UNCOMMITTED under EOS, I'm inclined to remove that clause and put in the necessary work to make that combination possible now. I currently see two possible approaches: 1. Disable TX StateStores internally when the IsolationLevel is READ_UNCOMMITTED and the processing.mode is EOS. This is more difficult than it sounds, as there are many assumptions being made throughout the internals about the guarantees StateStores provide. It would definitely add a lot of extra "if (read_uncommitted && eos)" branches, complicating maintenance and testing. 2. Invest the time *now* to make READ_UNCOMMITTED of EOS StateStores possible. I have some ideas on how this could be achieved, but they would need testing and could introduce some additional issues. The benefit of this approach is that it would make query-time IsolationLevels much simpler to implement in the future. Unfortunately, both will require considerable work that will further delay this KIP, which was the reason I placed the restriction in the KIP in the first place. Regards, Nick On Sat, 14 Oct 2023 at 03:30, Guozhang Wang wrote: > Hello Nick, > > First of all, thanks a lot for the great effort you've put in driving > this KIP! I really like it coming through finally, as many people in > the community have raised this. At the same time I honestly feel a bit > ashamed for not putting enough of my time supporting it and pushing it > through the finish line (you raised this KIP almost a year ago). > > I briefly passed through the DISCUSS thread so far, not sure I've 100 > percent digested all the bullet points. But with the goal of trying to > help take it through the finish line in mind, I'd want to throw > thoughts on top of my head only on the point #4 above which I felt may > be the main hurdle for the current KIP to drive to a consensus now. > > The general question I asked myself is, whether we want to couple "IQ > reading mode" with "processing mode". While technically I tend to > agree with you that, it's feels like a bug if some single user chose > "EOS" for processing mode while choosing "read uncommitted" for IQ > reading mode, at the same time, I'm thinking if it's possible that > there could be two different persons (or even two teams) that would be > using the stream API to build the app, and the IQ API to query the > running state of the app. I know this is less of a technical thing but > rather a more design stuff, but if it could be ever the case, I'm > wondering if the personale using the IQ API knows about the risks of > using read uncommitted but still chose so for the favor of > performance, no matter if the underlying stream processing mode > configured by another personale is EOS or not. In that regard, I'm > leaning towards a "leaving the door open, and close it later if we > found it's a bad idea" aspect with a configuration that we can > potentially deprecate than "shut the door, clean for everyone". More > specifically, allowing the processing mode / IQ read mode to be > decoupled, and if we found that there's no such cases as I speculated > above or people started complaining a lot, we can still enforce > coupling them. > > Again, just my 2c here. Thanks again for the great patience and > diligence on this KIP. > > > Guozhang > > > > On Fri, Oct 13, 2023 at 8:48 AM Nick Telford > wrote: > > > > Hi Bruno, > > > > 4. > > I'll hold off on making that change until we have a consensus as to what > > configuration to use to control all of this, as it'll be affected by the > > decision on EOS isolation levels. > > > > 5. > > Done. I've chosen "committedOffsets". > > > > Regards, > > Nick > > > > On Fri, 13 Oct 2023 at 16:23, Bruno Cadonna wrote: > > > > > Hi Nick, > > > > > > 1. > > > Ye
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Hi Lucas, Hmm, I'm not sure how we could reliably identify such leaked Iterators. If we tried to include open iterators when calculating iterator-duration, we'd need some kind of registry of all the open iterator creation timestamps, wouldn't we? In general, if you have a leaky Iterator, it should manifest as a persistently climbing "open-iterators" metric, even on a busy node, because each time that Iterator is used, it will leak another one. So even in the presence of many non-leaky Iterators on a busy instance, the metric should still consistently climb. Regards, Nick On Mon, 16 Oct 2023 at 14:24, Lucas Brutschy wrote: > Hi Nick! > > thanks for the KIP! I think this could be quite useful, given the > problems that we had with leaks due to RocksDB resources not being > closed. > > I don't have any pressing issues why we can't accept it like it is, > just one minor point for discussion: would it also make sense to make > it possible to identify a few very long-running / leaked iterators? I > can imagine on a busy node, it would be hard to spot that 1% of the > iterators never close when looking only at closed iterator or the > number of iterators. But it could still be good to identify those > leaks early. One option would be to add `iterator-duration-max` and > take open iterators into account when computing the metric. > > Cheers, > Lucas > > > On Thu, Oct 5, 2023 at 3:50 PM Nick Telford > wrote: > > > > Hi Colt, > > > > I kept the details out of the KIP, because KIPs generally document > > high-level design, but the way I'm doing it is like this: > > > > final ManagedKeyValueIterator > > rocksDbPrefixSeekIterator = cf.prefixScan(accessor, prefixBytes); > > --> final long startedAt = System.nanoTime(); > > openIterators.add(rocksDbPrefixSeekIterator); > > rocksDbPrefixSeekIterator.onClose(() -> { > > --> metricsRecorder.recordIteratorDuration(System.nanoTime() - > > startedAt); > > openIterators.remove(rocksDbPrefixSeekIterator); > > }); > > > > The lines with the arrow are the new code. This pattern is repeated > > throughout RocksDBStore, wherever a new RocksDbIterator is created. > > > > Regards, > > Nick > > > > On Thu, 5 Oct 2023 at 12:32, Colt McNealy wrote: > > > > > Thank you for the KIP, Nick! > > > > > > This would be highly useful for many reasons. Much more sane than > checking > > > for leaked iterators by profiling memory usage while running 100's of > > > thousands of range scans via interactive queries (: > > > > > > One small question: > > > > > > >The iterator-duration metrics will be updated whenever an Iterator's > > > close() method is called > > > > > > Does the Iterator have its own "createdAt()" or equivalent field, or > do we > > > need to keep track of the Iterator's start time upon creation? > > > > > > Cheers, > > > Colt McNealy > > > > > > *Founder, LittleHorse.dev* > > > > > > > > > On Wed, Oct 4, 2023 at 9:07 AM Nick Telford > > > wrote: > > > > > > > Hi everyone, > > > > > > > > KIP-989 is a small Kafka Streams KIP to add a few new metrics around > the > > > > creation and use of RocksDB Iterators, to aid users in identifying > > > > "Iterator leaks" that could cause applications to leak native memory. > > > > > > > > Let me know what you think! > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics > > > > > > > > P.S. I'm not too sure about the formatting of the "New Metrics" > table, > > > any > > > > advice there would be appreciated. > > > > > > > > Regards, > > > > Nick > > > > > > > >
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Lucas, Yeah, this is pretty much the direction I'm thinking of going in now. You make an interesting point about committing on-error under ALOS/READ_COMMITTED, although I haven't had a chance to think through the implications yet. Something that I ran into earlier this week is an issue with the new handling of TimeoutException. Without TX stores, TimeoutException under EOS throws a TaskCorruptedException, which wipes the stores. However, with TX stores, TimeoutException is now just bubbled up and dealt with as it is under ALOS. The problem arises when the Producer#commitTransaction call times out: Streams attempts to ignore the error and continue producing, which causes the next call to Producer#send to throw "IllegalStateException: Cannot attempt operation `send` because the previous call to `commitTransaction` timed out and must be retried". I'm not sure what we should do here: retrying the commitTransaction seems logical, but what if it times out again? Where do we draw the line and shutdown the instance? Regards, Nick On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy wrote: > Hi all, > > I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED, > but keep wiping the state on error, and I'd vote for this solution > when introducing `default.state.isolation.level`. This way, we'd have > the most low-risk roll-out of this feature (no behavior change without > reconfiguration), with the possibility of switching to the most sane / > battle-tested default settings in 4.0. Essentially, we'd have a > feature flag but call it `default.state.isolation.level` and don't > have to deprecate it later. > > So the possible configurations would then be this: > > 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ > reads from DB. > 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > WriteBatch/DB. Flush on error (see note below). > 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ > reads from DB. Wipe state on error. > 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > WriteBatch/DB. > > I believe the feature is important enough that we will see good > adoption even without changing the default. In 4.0, when we have seen > this being adopted and is battle-tested, we make READ_COMMITTED the > default for EOS, or even READ_COMITTED always the default, depending > on our experiences. And we could add a clever implementation of > READ_UNCOMITTED with WriteBatches later. > > The only smell here is that `default.state.isolation.level` wouldn't > be purely an IQ setting, but it would also (slightly) change the > behavior of the processing, but that seems unavoidable as long as we > haven't solve READ_UNCOMITTED IQ with WriteBatches. > > Minor: As for Bruno's point 4, I think if we are concerned about this > behavior (we don't necessarily have to be, because it doesn't violate > ALOS guarantees as far as I can see), we could make > ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing > the WriteBatch on error (obviously, only if we have a chance to do > that). > > Cheers, > Lucas > > On Mon, Oct 16, 2023 at 12:19 PM Nick Telford > wrote: > > > > Hi Guozhang, > > > > The KIP as it stands introduces a new configuration, > > default.state.isolation.level, which is independent of processing.mode. > > It's intended that this new configuration be used to configure a global > IQ > > isolation level in the short term, with a future KIP introducing the > > capability to change the isolation level on a per-query basis, falling > back > > to the "default" defined by this config. That's why I called it > "default", > > for future-proofing. > > > > However, it currently includes the caveat that READ_UNCOMMITTED is not > > available under EOS. I think this is the coupling you are alluding to? > > > > This isn't intended to be a restriction of the API, but is currently a > > technical limitation. However, after discussing with some users about > > use-cases that would require READ_UNCOMMITTED under EOS, I'm inclined to > > remove that clause and put in the necessary work to make that combination > > possible now. > > > > I currently see two possible approaches: > > > >1. Disable TX StateStores internally when the IsolationLevel is > >READ_UNCOMMITTED and the processing.mode is EOS. This is more > difficult > >than it sounds, as there are many assumptions being made throughout > the > >internals about the guarantees StateStores provide. It would > definitely add > >a lot of extra "if (read_u
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Lucas, TaskCorruptedException is how Streams signals that the Task state needs to be wiped, so we can't retain that exception without also wiping state on timeouts. Regards, Nick On Wed, 18 Oct 2023 at 14:48, Lucas Brutschy wrote: > Hi Nick, > > I think indeed the better behavior would be to retry commitTransaction > until we risk running out of time to meet `max.poll.interval.ms`. > > However, if it's handled as a `TaskCorruptedException` at the moment, > I would do the same in this KIP, and leave exception handling > improvements to future work. This KIP is already improving the > situation a lot by not wiping the state store. > > Cheers, > Lucas > > On Tue, Oct 17, 2023 at 3:51 PM Nick Telford > wrote: > > > > Hi Lucas, > > > > Yeah, this is pretty much the direction I'm thinking of going in now. You > > make an interesting point about committing on-error under > > ALOS/READ_COMMITTED, although I haven't had a chance to think through the > > implications yet. > > > > Something that I ran into earlier this week is an issue with the new > > handling of TimeoutException. Without TX stores, TimeoutException under > EOS > > throws a TaskCorruptedException, which wipes the stores. However, with TX > > stores, TimeoutException is now just bubbled up and dealt with as it is > > under ALOS. The problem arises when the Producer#commitTransaction call > > times out: Streams attempts to ignore the error and continue producing, > > which causes the next call to Producer#send to throw > > "IllegalStateException: Cannot attempt operation `send` because the > > previous call to `commitTransaction` timed out and must be retried". > > > > I'm not sure what we should do here: retrying the commitTransaction seems > > logical, but what if it times out again? Where do we draw the line and > > shutdown the instance? > > > > Regards, > > Nick > > > > On Mon, 16 Oct 2023 at 13:19, Lucas Brutschy .invalid> > > wrote: > > > > > Hi all, > > > > > > I think I liked your suggestion of allowing EOS with READ_UNCOMMITTED, > > > but keep wiping the state on error, and I'd vote for this solution > > > when introducing `default.state.isolation.level`. This way, we'd have > > > the most low-risk roll-out of this feature (no behavior change without > > > reconfiguration), with the possibility of switching to the most sane / > > > battle-tested default settings in 4.0. Essentially, we'd have a > > > feature flag but call it `default.state.isolation.level` and don't > > > have to deprecate it later. > > > > > > So the possible configurations would then be this: > > > > > > 1. ALOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ > > > reads from DB. > > > 2. ALOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > > > WriteBatch/DB. Flush on error (see note below). > > > 3. EOS/READ_UNCOMMITTED (default) = processing uses direct-to-DB, IQ > > > reads from DB. Wipe state on error. > > > 4. EOS/READ_COMMITTED = processing uses WriteBatch, IQ reads from > > > WriteBatch/DB. > > > > > > I believe the feature is important enough that we will see good > > > adoption even without changing the default. In 4.0, when we have seen > > > this being adopted and is battle-tested, we make READ_COMMITTED the > > > default for EOS, or even READ_COMITTED always the default, depending > > > on our experiences. And we could add a clever implementation of > > > READ_UNCOMITTED with WriteBatches later. > > > > > > The only smell here is that `default.state.isolation.level` wouldn't > > > be purely an IQ setting, but it would also (slightly) change the > > > behavior of the processing, but that seems unavoidable as long as we > > > haven't solve READ_UNCOMITTED IQ with WriteBatches. > > > > > > Minor: As for Bruno's point 4, I think if we are concerned about this > > > behavior (we don't necessarily have to be, because it doesn't violate > > > ALOS guarantees as far as I can see), we could make > > > ALOS/READ_COMMITTED more similar to ALOS/READ_UNCOMITTED by flushing > > > the WriteBatch on error (obviously, only if we have a chance to do > > > that). > > > > > > Cheers, > > > Lucas > > > > > > On Mon, Oct 16, 2023 at 12:19 PM Nick Telford > > > wrote: > > > > > > > > Hi Guozhang, > > > > >
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
I don't really have a problem with adding such a metric, I'm just not entirely sure how it would work. If we used "iterator-duration-max", for example, would it not be confusing that it includes Iterators that are still open, and therefore the duration is not yet known? When graphing that over time, I suspect it would be difficult to understand. 3. FWIW, this would still be picked up by "open-iterators", since that metric is only decremented when Iterator#close is called (via the ManagedKeyValueIterator#onClose hook). I'm actually considering expanding the scope of this KIP slightly to include improved Block Cache metrics, as my own memory leak investigations have trended in that direction. Do you think the following metrics should be included in this KIP, or should I create a new KIP? - block-cache-index-usage (number of bytes occupied by index blocks) - block-cache-filter-usage (number of bytes occupied by filter blocks) Regards, Nick On Tue, 24 Oct 2023 at 07:09, Sophie Blee-Goldman wrote: > I actually think we could implement Lucas' suggestion pretty easily and > without too much additional effort. We have full control over the iterator > that is returned by the various range queries, so it would be easy to > register a gauge metric for how long it has been since the iterator was > created. Then we just deregister the metric when the iterator is closed. > > With respect to how useful this metric would be, both Nick and Lucas have > made good points: I would agree that in general, leaking iterators would > mean an ever-increasing iterator count that should be possible to spot > without this. However, a few things to consider: > > 1. it's really easy to set up an alert based on some maximum threshold of > how long an iterator should remain open for. It's relatively more tricky to > set up alerts based on the current count of open iterators and how it > changes over time. > 2. As Lucas mentioned, it only takes a few iterators to wreak havoc in > extreme cases. Sometimes more advanced applications end up with just a few > leaking iterators despite closing the majority of them. I've seen this > happen just once personally, but it was driving everyone crazy until we > figured it out. > 3. A metric for how long the iterator has been open would help to identify > hanging iterators due to some logic where the iterator is properly closed > but for whatever reason just isn't being advanced to the end, and thus not > reached the iterator#close line of the user code. This case seems difficult > to spot without the specific metric for iterator lifetime > 4. Lastly, I think you could argue that all of the above are fairly > advanced use cases, but this seems like a fairly advanced feature already, > so it doesn't seem unreasonable to try and cover all the bases. > > All that said, my philosophy is that the KIP author gets the final word on > what to pull into scope as long as the proposal isn't harming anyone > without the extra feature/changes. So it's up to you Nick -- just wanted > to add some context on how it could work, and why it would be helpful > > Thanks for the KIP! > > On Wed, Oct 18, 2023 at 7:04 AM Lucas Brutschy > wrote: > > > Hi Nick, > > > > I did not think in detail about how to implement it, just about what > > metrics would be nice to have. You are right, we'd have to > > register/deregister the iterators during open/close. This would be > > more complicated to implement than the other metrics, but I do not see > > a fundamental problem with it. > > > > As far as I understand, even a low number of leaked iterators can hurt > > RocksDB compaction significantly. So we may even want to detect if the > > iterators are opened by one-time / rare queries against the state > > store. > > > > But, as I said, that would be an addition and not a change of the > > current contents of the KIP, so I'd support the KIP moving forward > > even without this extension. > > > > Cheers, Lucas > > > > > > > > On Tue, Oct 17, 2023 at 3:45 PM Nick Telford > > wrote: > > > > > > Hi Lucas, > > > > > > Hmm, I'm not sure how we could reliably identify such leaked Iterators. > > If > > > we tried to include open iterators when calculating iterator-duration, > > we'd > > > need some kind of registry of all the open iterator creation > timestamps, > > > wouldn't we? > > > > > > In general, if you have a leaky Iterator, it should manifest as a > > > persistently climbing "open-iterators" metric, even on a busy node, > > because > > > ea
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
Hi Sophie, Thanks for the review! 1-3. I had a feeling this was the case. I'm thinking of adding a PAUSED state with the following valid transitions: - RUNNING -> PAUSED - PAUSED -> RUNNING - PAUSED -> SUSPENDED The advantage of a dedicated State is it should make testing easier and also reduce the potential for introducing bugs into the existing Task states. While I appreciate that the engine is being revised, I think I'll still pursue this actively instead of waiting, as it addresses some problems my team is having right now. If the KIP is accepted, then I suspect that this feature would still be desirable with the new streams engine, so any new Task state would likely want to be mirrored in the new engine, and the high level design is unlikely to change. 4a. This is an excellent point I hadn't considered. Correct me if I'm wrong, but the only joins that this would impact are Stream-Stream and Stream-Table joins? Table-Table joins should be safe, because the join is commutative, so a delayed record on one side should just cause its output record to be delayed, but not lost. 4b. If we can enumerate only the node types that are impacted by this (i.e. Stream-Stream and Stream-Table joins), then perhaps we could restrict it such that it only pauses dependent Tasks if there's a Stream-Stream/Table join involved? The drawback here would be that custom stateful Processors might also be impacted, but there'd be no way to know if they're safe to not pause. 4c. Regardless, I like this idea, but I have very little knowledge about making changes to the rebalance/network protocol. It looks like this could be added via StreamsPartitionAssignor#subscriptionUserData? I might need some help designing this aspect of this KIP. Regards, Nick On Tue, 24 Oct 2023 at 07:30, Sophie Blee-Goldman wrote: > Hey Nick, > > A few high-level thoughts: > > 1. We definitely don't want to piggyback on the SUSPENDED task state, as > this is currently more like an intermediate state that a task passes > through as it's being closed/migrated elsewhere, it doesn't really mean > that a task is "suspended" and there's no logic to suspend processing on > it. What you want is probably closer in spirit to the concept of a paused > "named topology", where we basically freeze processing on a specific task > (or set of tasks). > 2. More importantly however, the SUSPENDED state was only ever needed to > support efficient eager rebalancing, and we plan to remove the eager > rebalancing protocol from Streams entirely in the near future. And > unfortunately, the named topologies feature was never fully implemented and > will probably be ripped out sometime soon as well. > 3. In short, to go this route, you'd probably need to implement a PAUSED > state from scratch. This wouldn't be impossible, but we are planning to > basically revamp the entire thread model and decouple the consumer > (potentially including the deserialization step) from the processing > threads. Much as I love the idea of this feature, it might not make a lot > of sense to spend time implementing right now when much of that work could > be scrapped in the mid-term future. We don't have a timeline for this, > however, so I don't think this should discourage you if the feature seems > worth it, just want to give you a sense of the upcoming roadmap. > 4. As for the feature itself, my only concern is that this feels like a > very advanced feature but it would be easy for new users to accidentally > abuse it and get their application in trouble. Specifically I'm worried > about how this could be harmful to applications for which some degree of > synchronization is required, such as a join. Correct join semantics rely > heavily on receiving records from both sides of the join and carefully > selecting the next one to process based on timestamp. Imagine if a > DeserializationException occurs upstream of a repartition feeding into one > side of a join (but not the other) and the user opts to PAUSE this task. If > the join continues as usual it could lead to missed or incorrect results > when processing is enforced with no records present on one side of the join > but usual traffic flowing through the other. Maybe we could somehow signal > to also PAUSE all downstream/dependent tasks? Should be able to add this > information to the subscription metadata and send to all clients via a > rebalance. There might be better options I'm not seeing. Or we could just > decide to trust the users not to shoot themselves in the foot -- as long as > we write a clear warning in the javadocs this might be fine > > Thanks for all the great KIPs! > > On Thu, Oct 12, 2023 at 9:51 AM Nick Telford > wrote: > > > Hi everyone, >
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
1. Woops! I've fixed that now. Thanks for catching that. 2. I agree, I'll remove the LogAndPause handler so it's clear this is an advanced feature. I'll also add some documentation to DeserializationExceptionResponse#SUSPEND that explains the care users should approach it with. 3a. This is interesting. My main concern is that there may be situations where skipping a single bad record is not the necessary solution, but the Task should still be resumed without restarting the application. For example, if there are several bad records in a row that should be skipped. 3b. Additionally, a Task may have multiple input topics, so we'd need some way to indicate which record to skip. These can probably be resolved by something like skipAndContinue(TaskId task, String topic, int recordsToSkip) or even skipAndContinue(TaskId task, Map recordsToSkipByTopic)? 4. Related to 2: I was thinking that users implementing their own handler may want to be able to determine which Processors (i.e. which Subtopology/task group) are being affected, so they can programmatically make a decision on whether it's safe to PAUSE. ProcessorContext, which is already a parameter to DeserializationExceptionHandler provides the TaskId of the failed Task, but doesn't provide metadata on the Processors that Task executes. Since TaskIds are non-deterministic (they can change when you modify your topology, with no influence over how they're assigned), a user cannot use TaskId alone to determine which Processors would be affected. What do you think would be the best way to provide this information to exception handlers? I was originally thinking that users could instantiate the handler themselves and provide a TopologyDescription (via KafkaStreams#describe) in the constructor, but it looks like configs of type Class cannot accept an already instantiated instance, and there's no other way to inject information like that. Perhaps we could add something to ProcessorContext that contains details on the sub-topology being executed? Regards, Nick On Thu, 26 Oct 2023 at 01:24, Sophie Blee-Goldman wrote: > 1. Makes sense to me! Can you just update the name of the > DeserializationHandlerResponse enum from SUSPEND to PAUSE so > we're consistent with the wording? > > The drawback here would be that custom stateful Processors > > might also be impacted, but there'd be no way to know if they're safe to > > not pause. > > > 2. This is a really good point -- maybe this is just a case where we have > to trust > in the user not to accidentally screw themselves over. As long as we > provide > sufficient information for them to decide when it is/isn't safe to pause a > task, > I would be ok with just documenting the dangers of indiscriminate use of > this > feature, and hope that everyone reads the warning. > > Given the above, I have one suggestion: what if we only add the PAUSE enum > in this KIP, and don't include an OOTB DeserializationExceptionHandler that > implements this? I see this as addressing two concerns: > 2a. It would make it clear that this is an advanced feature and should be > given > careful consideration, rather than just plugging in a config value. > 2b. It forces the user to implement the handler themselves, which gives > them > an opportunity to check on which task it is that's hitting the error and > then > make a conscious decision as to whether it is safe to pause or not. In the > end, > it's really impossible for us to know what is/is not safe to pause, so the > more > responsibility we can put on the user in this case, the better. > > 3. It sounds like the general recovery workflow would be to either resolve > the > issue somehow (presumably by fixing an issue in the deserializer?) and > restart the application -- in which case no further manual intervention is > required -- or else to determine the record is unprocessable and should be > skipped, in which case the user needs to somehow increment the offset > and then resume the task. > > It's a bit awkward to ask people to use the command line tools to manually > wind the offset forward. More importantly, there are likely many operators > who > don't have the permissions necessary to use the command line tools for > this kind of thing, and they would be pretty much out of luck in that case. > > On the flipside, it seems like if the user ever wants to resume the task > without restarting, they will need to skip over the bad record. I think we > can > make the feature considerably more ergonomic by modifying the behavior > of the #resume method so that it always skips over the bad record. This > will probably be the easiest to implement anyways, as it is effectively the > same as the CONTINUE option internally, but giv
[VOTE] KIP-892: Transactional StateStores
Hi everyone, I'd like to call a vote for KIP-892: Transactional StateStores[1], which makes Kafka Streams StateStores transactional under EOS. Regards, Nick 1: https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores
Re: [VOTE] KIP-892: Transactional StateStores
Hi everyone, With +3 binding votes (and +1 non-binding), the vote passes. KIP-892 Transactional StateStores is Adopted! Regards, Nick On Tue, 14 Nov 2023 at 09:56, Bruno Cadonna wrote: > Hi Nick! > > Thanks a lot for the KIP! > > Looking forward to the implementation! > > +1 (binding) > > Best, > Bruno > > On 11/14/23 2:23 AM, Sophie Blee-Goldman wrote: > > +1 (binding) > > > > Thanks a lot for this KIP! > > > > On Mon, Nov 13, 2023 at 8:39 AM Lucas Brutschy > > wrote: > > > >> Hi Nick, > >> > >> really happy with the final KIP. Thanks a lot for the hard work! > >> > >> +1 (binding) > >> > >> Lucas > >> > >> On Mon, Nov 13, 2023 at 4:20 PM Colt McNealy > wrote: > >>> > >>> +1 (non-binding). > >>> > >>> Thank you, Nick, for making all of the changes (especially around the > >>> `default.state.isolation.level` config). > >>> > >>> Colt McNealy > >>> > >>> *Founder, LittleHorse.dev* > >>> > >>> > >>> On Mon, Nov 13, 2023 at 7:15 AM Nick Telford > >> wrote: > >>> > >>>> Hi everyone, > >>>> > >>>> I'd like to call a vote for KIP-892: Transactional StateStores[1], > >> which > >>>> makes Kafka Streams StateStores transactional under EOS. > >>>> > >>>> Regards, > >>>> > >>>> Nick > >>>> > >>>> 1: > >>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-892%3A+Transactional+Semantics+for+StateStores > >>>> > >> > > >
Re: Apache Kafka 3.7.0 Release
Hi Stan, I'd like to propose including KIP-892 in the 3.7 release. The KIP has been accepted and I'm just working on rebasing the implementation against trunk before I open a PR. Regards, Nick On Tue, 21 Nov 2023 at 11:27, Mayank Shekhar Narula < mayanks.nar...@gmail.com> wrote: > Hi Stan > > Can you include KIP-951 to the 3.7 release plan? All PRs are merged in the > trunk. > > On Wed, Nov 15, 2023 at 4:05 PM Stanislav Kozlovski > wrote: > > > Friendly reminder to everybody that the KIP Freeze is *exactly 7 days > away* > > - November 22. > > > > A KIP must be accepted by this date in order to be considered for this > > release. Note, any KIP that may not be implemented in time, or otherwise > > risks heavily destabilizing the release, should be deferred. > > > > Best, > > Stan > > > > On Fri, Nov 3, 2023 at 6:03 AM Sophie Blee-Goldman < > sop...@responsive.dev> > > wrote: > > > > > Looks great, thank you! +1 > > > > > > On Thu, Nov 2, 2023 at 10:21 AM David Jacot > > > > > > wrote: > > > > > > > +1 from me as well. Thanks, Stan! > > > > > > > > David > > > > > > > > On Thu, Nov 2, 2023 at 6:04 PM Ismael Juma > wrote: > > > > > > > > > Thanks Stanislav, +1 > > > > > > > > > > Ismael > > > > > > > > > > On Thu, Nov 2, 2023 at 7:01 AM Stanislav Kozlovski > > > > > wrote: > > > > > > > > > > > Hi all, > > > > > > > > > > > > Given the discussion here and the lack of any pushback, I have > > > changed > > > > > the > > > > > > dates of the release: > > > > > > - KIP Freeze - *November 22 *(moved 4 days later) > > > > > > - Feature Freeze - *December 6 *(moved 2 days earlier) > > > > > > - Code Freeze - *December 20* > > > > > > > > > > > > If anyone has any thoughts against this proposal - please let me > > > know! > > > > It > > > > > > would be good to settle on this early. These will be the dates > > we're > > > > > going > > > > > > with > > > > > > > > > > > > Best, > > > > > > Stanislav > > > > > > > > > > > > On Thu, Oct 26, 2023 at 12:15 AM Sophie Blee-Goldman < > > > > > > sop...@responsive.dev> > > > > > > wrote: > > > > > > > > > > > > > Thanks for the response and explanations -- I think the main > > > question > > > > > for > > > > > > > me > > > > > > > was whether we intended to permanently increase the KF -- FF > gap > > > from > > > > > the > > > > > > > historical 1 week to 3 weeks? Maybe this was a conscious > decision > > > > and I > > > > > > > just > > > > > > > missed the memo, hopefully someone else can chime in here. I'm > > all > > > > for > > > > > > > additional though. And looking around at some of the recent > > > releases, > > > > > it > > > > > > > seems like we haven't been consistently following the "usual" > > > > schedule > > > > > > > since > > > > > > > the 2.x releases. > > > > > > > > > > > > > > Anyways, my main concern was making sure to leave a full 2 > weeks > > > > > between > > > > > > > feature freeze and code freeze, so I'm generally happy with the > > new > > > > > > > proposal. > > > > > > > Although I would still prefer to have the KIP freeze fall on a > > > > > Wednesday > > > > > > -- > > > > > > > Ismael actually brought up the same thing during the 3.5.0 > > release > > > > > > > planning, > > > > > > > so I'll just refer to his explanation for this: > > > > > > > > > > > > > > We typically choose a Wednesday for the various freeze dates - > > > there > > > > > are > > > > > > > > often 1-2 day slips and it's better if that doesn't require > > > people > > > > > > >
[VOTE] KIP-989: RocksDB Iterator Metrics
Hi everyone, I'd like to call a vote on the Kafka Streams KIP-989: RocksDB Iterator Metrics: https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics All of the points in the discussion thread have now been addressed. Regards, Nick
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi everyone, Sorry for the delay in replying. I've finally now got some time to work on this. Addressing Matthias's comments: 100. Good point. As Bruno mentioned, there's already AbstractReadWriteDecorator which we could leverage to provide that protection. I'll add details on this to the KIP. 101,102. It looks like these points have already been addressed by Bruno. Let me know if anything here is still unclear or you feel needs to be detailed more in the KIP. 103. I'm in favour of anything that gets the old code removed sooner, but wouldn't deprecating an API that we expect (some) users to implement cause problems? I'm thinking about implementers of custom StateStores, as they may be confused by managesOffsets() being deprecated, especially since they would have to mark their implementation as @Deprecated in order to avoid compile warnings. If deprecating an API *while it's still expected to be implemented* is something that's generally done in the project, then I'm happy to do so here. 104. I think this is technically possible, but at the cost of considerable additional code to maintain. Would we ever have a pathway to remove this downgrade code in the future? Regarding rebalance metadata: Opening all stores on start-up to read and cache their offsets is an interesting idea, especially if we can avoid re-opening the stores once the Tasks have been assigned. Scalability shouldn't be too much of a problem, because typically users have a fairly short state.cleanup.delay, so the number of on-disk Task directories should rarely exceed the number of Tasks previously assigned to that instance. An advantage of this approach is that it would also simplify StateStore implementations, as they would only need to guarantee that committed offsets are available when the store is open. I'll investigate this approach this week for feasibility and report back. I think that covers all the outstanding feedback, unless I missed anything? Regards, Nick On Mon, 6 May 2024 at 14:06, Bruno Cadonna wrote: > Hi Matthias, > > I see what you mean. > > To sum up: > > With this KIP the .checkpoint file is written when the store closes. > That is when: > 1. a task moves away from Kafka Streams client > 2. Kafka Streams client shuts down > > A Kafka Streams client needs the information in the .checkpoint file > 1. on startup because it does not have any open stores yet. > 2. during rebalances for non-empty state directories of tasks that are > not assigned to the Kafka Streams client. > > With hard crashes, i.e., when the Streams client is not able to close > its state stores and write the .checkpoint file, the .checkpoint file > might be quite stale. That influences the next rebalance after failover > negatively. > > > My conclusion is that Kafka Streams either needs to open the state > stores at start up or we write the checkpoint file more often. > > Writing the .checkpoint file during processing more often without > controlling the flush to disk would work. However, Kafka Streams would > checkpoint offsets that are not yet persisted on disk by the state > store. That is with a hard crash the offsets in the .checkpoint file > might be larger than the offsets checkpointed in the state store. That > might not be a problem if Kafka Streams uses the .checkpoint file only > to compute the task lag. The downside is that it makes the managing of > checkpoints more complex because now we have to maintain two > checkpoints: one for restoration and one for computing the task lag. > I think we should explore the option where Kafka Streams opens the state > stores at start up to get the offsets. > > I also checked when Kafka Streams needs the checkpointed offsets to > compute the task lag during a rebalance. Turns out Kafka Streams needs > them before sending the join request. Now, I am wondering if opening the > state stores of unassigned tasks whose state directory exists locally is > actually such a big issue due to the expected higher latency since it > happens actually before the Kafka Streams client joins the rebalance. > > Best, > Bruno > > > > > > > > On 5/4/24 12:05 AM, Matthias J. Sax wrote: > > That's good questions... I could think of a few approaches, but I admit > > it might all be a little bit tricky to code up... > > > > However if we don't solve this problem, I think this KIP does not really > > solve the core issue we are facing? In the end, if we rely on the > > `.checkpoint` file to compute a task assignment, but the `.checkpoint` > > file can be arbitrary stale after a crash because we only write it on a > > clean close, there would be still a huge gap that this KIP does not > close? > > > > For the case in which we
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Woops! Thanks for the catch Lucas. Given this was just a typo, I don't think this affects the voting. Cheers, Nick On Tue, 14 May 2024 at 18:06, Lucas Brutschy wrote: > Hi Nick, > > you are still referring to oldest-open-iterator-age-ms in the > `Proposed Changes` section. > > Cheers, > Lucas > > On Thu, May 2, 2024 at 4:00 PM Lucas Brutschy > wrote: > > > > Hi Nick! > > > > I agree, the age variant is a bit nicer since the semantics are very > > clear from the name. If you'd rather go for the simple implementation, > > how about calling it `oldest-iterator-open-since-ms`? I believe this > > could be understood without docs. Either way, I think we should be > > able to open the vote for this KIP because nobody raised any major / > > blocking concerns. > > > > Looking forward to getting this voted on soon! > > > > Cheers > > Lucas > > > > On Sun, Mar 31, 2024 at 5:23 PM Nick Telford > wrote: > > > > > > Hi Matthias, > > > > > > > For the oldest iterator metric, I would propose something simple like > > > > `iterator-opened-ms` and it would just be the actual timestamp when > the > > > > iterator was opened. I don't think we need to compute the actual age, > > > > but user can to this computation themselves? > > > > > > That works for me; it's easier to implement like that :-D I'm a little > > > concerned that the name "iterator-opened-ms" may not be obvious enough > > > without reading the docs. > > > > > > > If we think reporting the age instead of just the timestamp is > better, I > > > > would propose `iterator-max-age-ms`. I should be sufficient to call > out > > > > (as it's kinda "obvious" anyway) that the metric applies to open > > > > iterator only. > > > > > > While I think it's preferable to record the timestamp, rather than the > age, > > > this does have the benefit of a more obvious metric name. > > > > > > > Nit: the KIP says it's a store-level metric, but I think it would be > > > > good to say explicitly that it's recorded with DEBUG level only? > > > > > > Yes, I've already updated the KIP with this information in the table. > > > > > > Regards, > > > > > > Nick > > > > > > On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax > wrote: > > > > > > > The time window thing was just an idea. Happy to drop it. > > > > > > > > For the oldest iterator metric, I would propose something simple like > > > > `iterator-opened-ms` and it would just be the actual timestamp when > the > > > > iterator was opened. I don't think we need to compute the actual age, > > > > but user can to this computation themselves? > > > > > > > > If we think reporting the age instead of just the timestamp is > better, I > > > > would propose `iterator-max-age-ms`. I should be sufficient to call > out > > > > (as it's kinda "obvious" anyway) that the metric applies to open > > > > iterator only. > > > > > > > > And yes, I was hoping that the code inside MetereXxxStore might > already > > > > be setup in a way that custom stores would inherit the iterator > metrics > > > > automatically -- I am just not sure, and left it as an exercise for > > > > somebody to confirm :) > > > > > > > > > > > > Nit: the KIP says it's a store-level metric, but I think it would be > > > > good to say explicitly that it's recorded with DEBUG level only? > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 3/28/24 2:52 PM, Nick Telford wrote: > > > > > Quick addendum: > > > > > > > > > > My suggested metric "oldest-open-iterator-age-seconds" should be > > > > > "oldest-open-iterator-age-ms". Milliseconds is obviously a better > > > > > granularity for such a metric. > > > > > > > > > > Still accepting suggestions for a better name. > > > > > > > > > > On Thu, 28 Mar 2024 at 13:41, Nick Telford > > > > > wrote: > > > > > > > > > >> Hi everyone, > > > > >> > > > > >> Sorry for leaving this for so long. So much for &qu
Re: [DISCUSS] Apache Kafka 3.8.0 release
Hi Josep, Would it be possible to sneak KIP-989 into 3.8? Just as with 1028, it's currently being voted on and has already received the requisite votes. The only thing holding it back is the 72 hour voting window. Vote thread here: https://lists.apache.org/thread/nhr65h4784z49jbsyt5nx8ys81q90k6s Regards, Nick On Wed, 15 May 2024 at 17:47, Josep Prat wrote: > And my maths are wrong! I added 24 hours more to all the numbers in there. > If after 72 hours no vetoes appear, I have no objections on adding this > specific KIP as it shouldn't have a big blast radius of affectation. > > Best, > > On Wed, May 15, 2024 at 6:44 PM Josep Prat wrote: > > > Ah, I see Chris was faster writing this than me. > > > > On Wed, May 15, 2024 at 6:43 PM Josep Prat wrote: > > > >> Hi all, > >> You still have the full day of today (independently for the timezone) to > >> get KIPs approved. Tomorrow morning (CEST timezone) I'll send another > email > >> asking developers to assign future approved KIPs to another version > that is > >> not 3.8. > >> > >> So, the only problem I see with KIP-1028 is that it hasn't been open for > >> a vote for 72 hours (48 hours as of now). If there is no negative > voting on > >> the KIP I think we can let that one in, given it would only miss the > >> deadline by less than 12 hours (if my timezone maths add up). > >> > >> Best, > >> > >> On Wed, May 15, 2024 at 6:35 PM Ismael Juma wrote: > >> > >>> The KIP freeze is just about having the KIP accepted. Not sure why we > >>> would > >>> need an exception for that. > >>> > >>> Ismael > >>> > >>> On Wed, May 15, 2024 at 9:20 AM Chris Egerton > > >>> wrote: > >>> > >>> > FWIW I think that the low blast radius for KIP-1028 should allow it > to > >>> > proceed without adhering to the usual KIP and feature freeze dates. > >>> Code > >>> > freeze is probably worth still respecting, at least if changes are > >>> > required to the docker/jvm/Dockerfile. But I defer to Josep's > >>> judgement as > >>> > the release manager. > >>> > > >>> > On Wed, May 15, 2024, 06:59 Vedarth Sharma > > >>> > wrote: > >>> > > >>> > > Hey Josep! > >>> > > > >>> > > The KIP 1028 has received the required votes. Voting thread:- > >>> > > https://lists.apache.org/thread/cdq4wfv5v1gpqlxnf46ycwtcwk5wos4q > >>> > > But we are keeping the vote open for 72 hours as per the process. > >>> > > > >>> > > I would like to request you to please consider it for the 3.8.0 > >>> release. > >>> > > > >>> > > Thanks and regards, > >>> > > Vedarth > >>> > > > >>> > > > >>> > > On Wed, May 15, 2024 at 1:14 PM Josep Prat > >>> > >>> > > wrote: > >>> > > > >>> > > > Hi Kafka developers! > >>> > > > > >>> > > > Today is the KIP freeze deadline. All KIPs should be accepted by > >>> EOD > >>> > > today. > >>> > > > Tomorrow morning (CEST timezone) I'll start summarizing all KIPs > >>> that > >>> > > have > >>> > > > been approved. Please any KIP approved after tomorrow should be > >>> adopted > >>> > > in > >>> > > > a future release version, not 3.8. > >>> > > > > >>> > > > Other relevant upcoming deadlines: > >>> > > > - Feature freeze is on May 29th > >>> > > > - Code freeze is June 12th > >>> > > > > >>> > > > Best, > >>> > > > > >>> > > > On Fri, May 3, 2024 at 3:59 PM Josep Prat > >>> wrote: > >>> > > > > >>> > > > > Hi Kafka developers! > >>> > > > > I just wanted to remind you all of the upcoming relevant dates > >>> for > >>> > > Kafka > >>> > > > > 3.8.0: > >>> > > > > - KIP freeze is on May 15th (this is in a little less than 2 > >>> weeks) > >>> > > > > - Feature freeze is on May 29th (this is in a little more than > 25 >
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Good point! I've updated it to "Improved StateStore Iterator metrics for detecting leaks" - let me know if you have a better suggestion. This should affect the voting imo, as nothing of substance has changed. Regards, Nick On Thu, 16 May 2024 at 01:39, Sophie Blee-Goldman wrote: > One quick thing -- can you update the title of this KIP to reflect the > decision to implement these metrics for all state stores implementations > rather than just RocksDB? > > > On Tue, May 14, 2024 at 1:36 PM Nick Telford > wrote: > > > Woops! Thanks for the catch Lucas. Given this was just a typo, I don't > > think this affects the voting. > > > > Cheers, > > Nick > > > > On Tue, 14 May 2024 at 18:06, Lucas Brutschy > .invalid> > > wrote: > > > > > Hi Nick, > > > > > > you are still referring to oldest-open-iterator-age-ms in the > > > `Proposed Changes` section. > > > > > > Cheers, > > > Lucas > > > > > > On Thu, May 2, 2024 at 4:00 PM Lucas Brutschy > > > wrote: > > > > > > > > Hi Nick! > > > > > > > > I agree, the age variant is a bit nicer since the semantics are very > > > > clear from the name. If you'd rather go for the simple > implementation, > > > > how about calling it `oldest-iterator-open-since-ms`? I believe this > > > > could be understood without docs. Either way, I think we should be > > > > able to open the vote for this KIP because nobody raised any major / > > > > blocking concerns. > > > > > > > > Looking forward to getting this voted on soon! > > > > > > > > Cheers > > > > Lucas > > > > > > > > On Sun, Mar 31, 2024 at 5:23 PM Nick Telford > > > > wrote: > > > > > > > > > > Hi Matthias, > > > > > > > > > > > For the oldest iterator metric, I would propose something simple > > like > > > > > > `iterator-opened-ms` and it would just be the actual timestamp > when > > > the > > > > > > iterator was opened. I don't think we need to compute the actual > > age, > > > > > > but user can to this computation themselves? > > > > > > > > > > That works for me; it's easier to implement like that :-D I'm a > > little > > > > > concerned that the name "iterator-opened-ms" may not be obvious > > enough > > > > > without reading the docs. > > > > > > > > > > > If we think reporting the age instead of just the timestamp is > > > better, I > > > > > > would propose `iterator-max-age-ms`. I should be sufficient to > call > > > out > > > > > > (as it's kinda "obvious" anyway) that the metric applies to open > > > > > > iterator only. > > > > > > > > > > While I think it's preferable to record the timestamp, rather than > > the > > > age, > > > > > this does have the benefit of a more obvious metric name. > > > > > > > > > > > Nit: the KIP says it's a store-level metric, but I think it would > > be > > > > > > good to say explicitly that it's recorded with DEBUG level only? > > > > > > > > > > Yes, I've already updated the KIP with this information in the > table. > > > > > > > > > > Regards, > > > > > > > > > > Nick > > > > > > > > > > On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax > > > wrote: > > > > > > > > > > > The time window thing was just an idea. Happy to drop it. > > > > > > > > > > > > For the oldest iterator metric, I would propose something simple > > like > > > > > > `iterator-opened-ms` and it would just be the actual timestamp > when > > > the > > > > > > iterator was opened. I don't think we need to compute the actual > > age, > > > > > > but user can to this computation themselves? > > > > > > > > > > > > If we think reporting the age instead of just the timestamp is > > > better, I > > > > > > would propose `iterator-max-age-ms`. I should be sufficient to > call > > > out > > > > > > (as it's kinda "obvious" anyway) that th
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Actually, one other point: our existing state store operation metrics are measured in nanoseconds[1]. Should iterator-duration-(avg|max) also be measured in nanoseconds, for consistency, or should we keep them milliseconds, as the KIP currently states? 1: https://docs.confluent.io/platform/current/streams/monitoring.html#state-store-metrics On Thu, 16 May 2024 at 12:15, Nick Telford wrote: > Good point! I've updated it to "Improved StateStore Iterator metrics for > detecting leaks" - let me know if you have a better suggestion. > > This should affect the voting imo, as nothing of substance has changed. > > Regards, > Nick > > On Thu, 16 May 2024 at 01:39, Sophie Blee-Goldman > wrote: > >> One quick thing -- can you update the title of this KIP to reflect the >> decision to implement these metrics for all state stores implementations >> rather than just RocksDB? >> >> >> On Tue, May 14, 2024 at 1:36 PM Nick Telford >> wrote: >> >> > Woops! Thanks for the catch Lucas. Given this was just a typo, I don't >> > think this affects the voting. >> > >> > Cheers, >> > Nick >> > >> > On Tue, 14 May 2024 at 18:06, Lucas Brutschy > > .invalid> >> > wrote: >> > >> > > Hi Nick, >> > > >> > > you are still referring to oldest-open-iterator-age-ms in the >> > > `Proposed Changes` section. >> > > >> > > Cheers, >> > > Lucas >> > > >> > > On Thu, May 2, 2024 at 4:00 PM Lucas Brutschy > > >> > > wrote: >> > > > >> > > > Hi Nick! >> > > > >> > > > I agree, the age variant is a bit nicer since the semantics are very >> > > > clear from the name. If you'd rather go for the simple >> implementation, >> > > > how about calling it `oldest-iterator-open-since-ms`? I believe this >> > > > could be understood without docs. Either way, I think we should be >> > > > able to open the vote for this KIP because nobody raised any major / >> > > > blocking concerns. >> > > > >> > > > Looking forward to getting this voted on soon! >> > > > >> > > > Cheers >> > > > Lucas >> > > > >> > > > On Sun, Mar 31, 2024 at 5:23 PM Nick Telford < >> nick.telf...@gmail.com> >> > > wrote: >> > > > > >> > > > > Hi Matthias, >> > > > > >> > > > > > For the oldest iterator metric, I would propose something simple >> > like >> > > > > > `iterator-opened-ms` and it would just be the actual timestamp >> when >> > > the >> > > > > > iterator was opened. I don't think we need to compute the actual >> > age, >> > > > > > but user can to this computation themselves? >> > > > > >> > > > > That works for me; it's easier to implement like that :-D I'm a >> > little >> > > > > concerned that the name "iterator-opened-ms" may not be obvious >> > enough >> > > > > without reading the docs. >> > > > > >> > > > > > If we think reporting the age instead of just the timestamp is >> > > better, I >> > > > > > would propose `iterator-max-age-ms`. I should be sufficient to >> call >> > > out >> > > > > > (as it's kinda "obvious" anyway) that the metric applies to open >> > > > > > iterator only. >> > > > > >> > > > > While I think it's preferable to record the timestamp, rather than >> > the >> > > age, >> > > > > this does have the benefit of a more obvious metric name. >> > > > > >> > > > > > Nit: the KIP says it's a store-level metric, but I think it >> would >> > be >> > > > > > good to say explicitly that it's recorded with DEBUG level only? >> > > > > >> > > > > Yes, I've already updated the KIP with this information in the >> table. >> > > > > >> > > > > Regards, >> > > > > >> > > > > Nick >> > > > > >> > > > > On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax >> > > wrote: >> > > > > >> > > >
Re: [VOTE] KIP-989: RocksDB Iterator Metrics
Hi everyone, With 3 binding votes and no objections, the vote passes. KIP-989 is adopted. Cheers, Nick On Wed, 15 May 2024 at 03:41, Sophie Blee-Goldman wrote: > +1 (binding) > > Thanks! > > On Tue, May 14, 2024 at 6:58 PM Matthias J. Sax wrote: > > > +1 (binding) > > > > On 5/14/24 9:19 AM, Lucas Brutschy wrote: > > > Hi Nick! > > > > > > Thanks for the KIP. > > > > > > +1 (binding) > > > > > > On Tue, May 14, 2024 at 5:16 PM Nick Telford > > wrote: > > >> > > >> Hi everyone, > > >> > > >> I'd like to call a vote on the Kafka Streams KIP-989: RocksDB Iterator > > >> Metrics: > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics > > >> > > >> All of the points in the discussion thread have now been addressed. > > >> > > >> Regards, > > >> > > >> Nick > > >
Re: [VOTE] KIP-989: RocksDB Iterator Metrics
Oh shoot, you're right. I miscounted. The vote remains open. On Thu, 16 May 2024, 20:11 Josep Prat, wrote: > Hi Nick, > I think you need one more day to reach the 72 hours. You opened the vote on > the 14th, right? > > Best, > > > > Josep Prat > Open Source Engineering Director, aivenjosep.p...@aiven.io | > +491715557497 | aiven.io > Aiven Deutschland GmbH > Alexanderufer 3-7, 10117 Berlin > Geschäftsführer: Oskari Saarenmaa & Hannu Valtonen > Amtsgericht Charlottenburg, HRB 209739 B > > On Thu, May 16, 2024, 19:40 Nick Telford wrote: > > > Hi everyone, > > > > With 3 binding votes and no objections, the vote passes. > > > > KIP-989 is adopted. > > > > Cheers, > > Nick > > > > On Wed, 15 May 2024 at 03:41, Sophie Blee-Goldman > > > wrote: > > > > > +1 (binding) > > > > > > Thanks! > > > > > > On Tue, May 14, 2024 at 6:58 PM Matthias J. Sax > > wrote: > > > > > > > +1 (binding) > > > > > > > > On 5/14/24 9:19 AM, Lucas Brutschy wrote: > > > > > Hi Nick! > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > +1 (binding) > > > > > > > > > > On Tue, May 14, 2024 at 5:16 PM Nick Telford < > nick.telf...@gmail.com > > > > > > > wrote: > > > > >> > > > > >> Hi everyone, > > > > >> > > > > >> I'd like to call a vote on the Kafka Streams KIP-989: RocksDB > > Iterator > > > > >> Metrics: > > > > >> > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-989%3A+RocksDB+Iterator+Metrics > > > > >> > > > > >> All of the points in the discussion thread have now been > addressed. > > > > >> > > > > >> Regards, > > > > >> > > > > >> Nick > > > > > > > > > >
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi everyone, As discussed on the Zoom call, we're going to handle rebalance meta-data by: - On start-up, Streams will open each store and read its changelog offsets into an in-memory cache. This cache will be shared among all StreamThreads. - On rebalance, the cache will be consulted for Task offsets for any Task that is not active on any instance-local StreamThreads. If the Task is active on *any* instance-local StreamThread, we will report the Task lag as "up to date" (i.e. -1), because we know that the local state is currently up-to-date. We will avoid caching offsets across restarts in the legacy ".checkpoint" file, so that we can eliminate the logic for handling this class. If performance of opening/closing many state stores is poor, we can parallelise it by forking off a thread for each Task directory when reading the offsets. I'll update the KIP later today to reflect this design, but I will try to keep it high-level, so that the exact implementation can vary. Regards, Nick On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman wrote: > 103: I like the idea of immediately deprecating #managesOffsets and aiming > to make offset management mandatory in the long run. I assume we would also > log a warning for any custom stores that return "false" from this method to > encourage custom store implementations to start doing so? My only > question/concern is that if we want folks to start managing their own > offsets then we should make this transition easy for them, perhaps by > exposing some public utility APIs for things that are currently handled by > Kafka Streams such as reading/writing checkpoint files. Maybe it would be > useful to include a small example in the KIP of what it would actually mean > to "manage your own offsets" -- I know (all too well) that plugging in > custom storage implementations is not easy and most people who do this are > probably fairly advanced users, but offset management will be a totally new > ballgame to most people people and this kind of feels like throwing them > off the deep end. We should at least provide a lifejacket via some kind of > utility API and/or example > > 200. There's been a lot of back and forth on the rebalance metadata/task > lag computation question, so forgive me if I missed any part of this, but I > think we've landed at the right idea here. To summarize: the "tl;dr" > explanation is that we'll write the checkpoint file only on close and will > account for hard-crash scenarios by opening up the stores on startup and > writing a checkpoint file for any missing tasks. Does that sound about > right? > > A few clarifications: > I think we're all more or less on the same page here but just to be > absolutely clear, the task lags for each task directory found on disk will > be reported by only one of the StreamThreads, and each StreamThread will > report lags only for tasks that it already owns or are not assigned to any > other StreamThread in the client. In other words, we only need to get the > task lag for completely unassigned/unlocked tasks, which means if there is > a checkpoint file at all then it must be up-to-date, because there is no > other StreamThread actively writing to that state store (if so then only > that StreamThread would report lag for that particular task). > > This still leaves the "no checkpoint at all" case which as previously > mentioned can occur after a hard-crash. Luckily we only have to worry > about this once, after starting up again following said hard crash. We can > simply open up each of the state stores before ever joining the group, get > the offsets from rocksdb, and write them to a new checkpoint file. After > that, we can depend on the checkpoints written at close and won't have to > open up any stores that aren't already assigned for the reasons laid out in > the paragraph above. > > As for the specific mechanism and which thread-does-what, since there were > some questions, this is how I'm imagining the process: > >1. The general idea is that we simply go through each task directories >with state but no checkpoint file and open the StateStore, call >#committedOffset, and then write it to the checkpoint file. We can then >close these stores and let things proceed as normal. >2. This only has to happen once, during startup, but we have two >options: > 1. Do this from KafkaStreams#start, ie before we even create the > StreamThreads > 2. Do this from StreamThread#start, following a similar lock-based > approach to the one used #computeTaskLags, where each StreamThread > just > makes a pass over the task directories on disk and attempts to lock > them > one by
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi everyone, Sorry I haven't got around to updating the KIP yet. Now that I've wrapped up KIP-989, I'm going to be working on 1035 starting today. I'll update the KIP first, and then call a vote. Regards, Nick On Wed, 29 May 2024 at 07:25, Bruno Cadonna wrote: > Totally agree on moving forward and starting the VOTE! > > However, the KIP should be updated with the new info before starting the > VOTE. > > Best, > Bruno > > On 5/29/24 2:36 AM, Matthias J. Sax wrote: > > Sounds like a good plan. -- I think we are still wrapping up 3.8 > > release, but would also like to move forward with with one. > > > > Should we start a VOTE? > > > > For merging PRs we need to wait after code freeze, and 3.8 branch was > > but. But we could start reviewing PRs before this already. > > > > > > -Matthias > > > > On 5/17/24 3:05 AM, Nick Telford wrote: > >> Hi everyone, > >> > >> As discussed on the Zoom call, we're going to handle rebalance > >> meta-data by: > >> > >> - On start-up, Streams will open each store and read its changelog > >> offsets > >> into an in-memory cache. This cache will be shared among all > >> StreamThreads. > >> - On rebalance, the cache will be consulted for Task offsets for any > Task > >> that is not active on any instance-local StreamThreads. If the Task is > >> active on *any* instance-local StreamThread, we will report the Task > >> lag as > >> "up to date" (i.e. -1), because we know that the local state is > currently > >> up-to-date. > >> > >> We will avoid caching offsets across restarts in the legacy > ".checkpoint" > >> file, so that we can eliminate the logic for handling this class. If > >> performance of opening/closing many state stores is poor, we can > >> parallelise it by forking off a thread for each Task directory when > >> reading > >> the offsets. > >> > >> I'll update the KIP later today to reflect this design, but I will try > to > >> keep it high-level, so that the exact implementation can vary. > >> > >> Regards, > >> > >> Nick > >> > >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < > sop...@responsive.dev> > >> wrote: > >> > >>> 103: I like the idea of immediately deprecating #managesOffsets and > >>> aiming > >>> to make offset management mandatory in the long run. I assume we > >>> would also > >>> log a warning for any custom stores that return "false" from this > >>> method to > >>> encourage custom store implementations to start doing so? My only > >>> question/concern is that if we want folks to start managing their own > >>> offsets then we should make this transition easy for them, perhaps by > >>> exposing some public utility APIs for things that are currently > >>> handled by > >>> Kafka Streams such as reading/writing checkpoint files. Maybe it > >>> would be > >>> useful to include a small example in the KIP of what it would > >>> actually mean > >>> to "manage your own offsets" -- I know (all too well) that plugging in > >>> custom storage implementations is not easy and most people who do > >>> this are > >>> probably fairly advanced users, but offset management will be a > >>> totally new > >>> ballgame to most people people and this kind of feels like throwing > them > >>> off the deep end. We should at least provide a lifejacket via some > >>> kind of > >>> utility API and/or example > >>> > >>> 200. There's been a lot of back and forth on the rebalance > metadata/task > >>> lag computation question, so forgive me if I missed any part of this, > >>> but I > >>> think we've landed at the right idea here. To summarize: the "tl;dr" > >>> explanation is that we'll write the checkpoint file only on close and > >>> will > >>> account for hard-crash scenarios by opening up the stores on startup > and > >>> writing a checkpoint file for any missing tasks. Does that sound about > >>> right? > >>> > >>> A few clarifications: > >>> I think we're all more or less on the same page here but just to be > >>> absolutely clear, the task lags for each task directory found on disk > >>
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
I've updated the KIP with the following: - Deprecation of StateStore#managesOffsets - Change StateStore#commit to throw UnsupportedOperationException when called from a Processor (via AbstractReadWriteDecorator) - Updated consumer rebalance lag computation strategy <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata> based on our Meet discussion - I've added a bit more detail here than we discussed, in particular around how we handle the offsets for tasks assigned to our local instance, and how we handle offsets when Tasks are closed/revoked. - Improved downgrade behaviour - Note: users that don't downgrade with upgrade.from will still get the wipe-and-restore behaviour by-default. I believe this covers all the outstanding changes that were requested. Please let me know if I've missed anything or you think further changes are needed. Regards, Nick On Wed, 29 May 2024 at 09:28, Nick Telford wrote: > Hi everyone, > > Sorry I haven't got around to updating the KIP yet. Now that I've wrapped > up KIP-989, I'm going to be working on 1035 starting today. > > I'll update the KIP first, and then call a vote. > > Regards, > Nick > > On Wed, 29 May 2024 at 07:25, Bruno Cadonna wrote: > >> Totally agree on moving forward and starting the VOTE! >> >> However, the KIP should be updated with the new info before starting the >> VOTE. >> >> Best, >> Bruno >> >> On 5/29/24 2:36 AM, Matthias J. Sax wrote: >> > Sounds like a good plan. -- I think we are still wrapping up 3.8 >> > release, but would also like to move forward with with one. >> > >> > Should we start a VOTE? >> > >> > For merging PRs we need to wait after code freeze, and 3.8 branch was >> > but. But we could start reviewing PRs before this already. >> > >> > >> > -Matthias >> > >> > On 5/17/24 3:05 AM, Nick Telford wrote: >> >> Hi everyone, >> >> >> >> As discussed on the Zoom call, we're going to handle rebalance >> >> meta-data by: >> >> >> >> - On start-up, Streams will open each store and read its changelog >> >> offsets >> >> into an in-memory cache. This cache will be shared among all >> >> StreamThreads. >> >> - On rebalance, the cache will be consulted for Task offsets for any >> Task >> >> that is not active on any instance-local StreamThreads. If the Task is >> >> active on *any* instance-local StreamThread, we will report the Task >> >> lag as >> >> "up to date" (i.e. -1), because we know that the local state is >> currently >> >> up-to-date. >> >> >> >> We will avoid caching offsets across restarts in the legacy >> ".checkpoint" >> >> file, so that we can eliminate the logic for handling this class. If >> >> performance of opening/closing many state stores is poor, we can >> >> parallelise it by forking off a thread for each Task directory when >> >> reading >> >> the offsets. >> >> >> >> I'll update the KIP later today to reflect this design, but I will try >> to >> >> keep it high-level, so that the exact implementation can vary. >> >> >> >> Regards, >> >> >> >> Nick >> >> >> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < >> sop...@responsive.dev> >> >> wrote: >> >> >> >>> 103: I like the idea of immediately deprecating #managesOffsets and >> >>> aiming >> >>> to make offset management mandatory in the long run. I assume we >> >>> would also >> >>> log a warning for any custom stores that return "false" from this >> >>> method to >> >>> encourage custom store implementations to start doing so? My only >> >>> question/concern is that if we want folks to start managing their own >> >>> offsets then we should make this transition easy for them, perhaps by >> >>> exposing some public utility APIs for things that are currently >> >>> handled by >> >>> Kafka Streams such as reading/writing checkpoint files. Maybe it >> >>> would be >> >>> useful to include a small example in the KIP of what it would >> >>> actually mean >> >>> to &q
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi everyone, I didn't spot this before, but it looks like the API of KeyValueStoreTestDriver will need to be updated to change the nomenclature from "flushed" to "committed": numFlushedEntryRemoved() -> numCommittedEntryRemoved() numFlushedEntryStored() -> numCommittedEntryStored() flushedEntryRemoved(K) -> committedEntryRemoved(K) flushedEntryStored(K) -> committedEntryStored(K) The old methods will obviously be marked as @Deprecated. Any objections before I add this to the KIP? Regards, Nick On Wed, 29 May 2024 at 11:20, Nick Telford wrote: > I've updated the KIP with the following: > >- Deprecation of StateStore#managesOffsets >- Change StateStore#commit to throw UnsupportedOperationException when >called from a Processor (via AbstractReadWriteDecorator) >- Updated consumer rebalance lag computation strategy > > <https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets#KIP1035:StateStoremanagedchangelogoffsets-ConsumerRebalanceMetadata> >based on our Meet discussion > - I've added a bit more detail here than we discussed, in > particular around how we handle the offsets for tasks assigned to our > local > instance, and how we handle offsets when Tasks are closed/revoked. >- Improved downgrade behaviour > - Note: users that don't downgrade with upgrade.from will still get > the wipe-and-restore behaviour by-default. > > I believe this covers all the outstanding changes that were requested. > Please let me know if I've missed anything or you think further changes are > needed. > > Regards, > Nick > > On Wed, 29 May 2024 at 09:28, Nick Telford wrote: > >> Hi everyone, >> >> Sorry I haven't got around to updating the KIP yet. Now that I've wrapped >> up KIP-989, I'm going to be working on 1035 starting today. >> >> I'll update the KIP first, and then call a vote. >> >> Regards, >> Nick >> >> On Wed, 29 May 2024 at 07:25, Bruno Cadonna wrote: >> >>> Totally agree on moving forward and starting the VOTE! >>> >>> However, the KIP should be updated with the new info before starting the >>> VOTE. >>> >>> Best, >>> Bruno >>> >>> On 5/29/24 2:36 AM, Matthias J. Sax wrote: >>> > Sounds like a good plan. -- I think we are still wrapping up 3.8 >>> > release, but would also like to move forward with with one. >>> > >>> > Should we start a VOTE? >>> > >>> > For merging PRs we need to wait after code freeze, and 3.8 branch was >>> > but. But we could start reviewing PRs before this already. >>> > >>> > >>> > -Matthias >>> > >>> > On 5/17/24 3:05 AM, Nick Telford wrote: >>> >> Hi everyone, >>> >> >>> >> As discussed on the Zoom call, we're going to handle rebalance >>> >> meta-data by: >>> >> >>> >> - On start-up, Streams will open each store and read its changelog >>> >> offsets >>> >> into an in-memory cache. This cache will be shared among all >>> >> StreamThreads. >>> >> - On rebalance, the cache will be consulted for Task offsets for any >>> Task >>> >> that is not active on any instance-local StreamThreads. If the Task is >>> >> active on *any* instance-local StreamThread, we will report the Task >>> >> lag as >>> >> "up to date" (i.e. -1), because we know that the local state is >>> currently >>> >> up-to-date. >>> >> >>> >> We will avoid caching offsets across restarts in the legacy >>> ".checkpoint" >>> >> file, so that we can eliminate the logic for handling this class. If >>> >> performance of opening/closing many state stores is poor, we can >>> >> parallelise it by forking off a thread for each Task directory when >>> >> reading >>> >> the offsets. >>> >> >>> >> I'll update the KIP later today to reflect this design, but I will >>> try to >>> >> keep it high-level, so that the exact implementation can vary. >>> >> >>> >> Regards, >>> >> >>> >> Nick >>> >> >>> >> On Thu, 16 May 2024 at 03:12, Sophie Blee-Goldman < >>> sop...@responsive.dev> >>> >> wrote: >>> >> >>&
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi Matthias, Thanks for your thorough review. 200. (#managesOffsets requirements) Done 201. (#commit atomicity recommendation vs. guarantee) There are possible StateStore implementations, including existing ones, that can't guarantee atomicity - because the underlying database/system doesn't support it. It might be hypothetically possible for atomicity to be layered on top of these systems, but doing so would likely be complex and potentially introduce performance issues. For this reason, I chose to make this a recommendation over a requirement. The only hard requirement is that writes are persisted to disk *before* offsets, so there's no data loss. 202. (Metrics descriptions) I've actually updated the metric descriptions to match those of the existing "flush" metrics. Is that ok? 203a. (Consumer Rebalance Metadata) This is definitely implementation detail, so I've removed the "and close() these stores" from the KIP, as well as the last paragraph about performance testing and using separate threads, as they are also implementation details. I'm actually working on this logic right now. Currently I have it just initializing a StandbyTask for each found directory and then closing them after we get the offsets. The reason we need to initialize an entire Task is because StateStores are constructed by the ProcessorTopology when it's initialized. I'm going to spend some time today, looking into whether we can keep these StandbyTasks open but not RUNNING, and then on-assignment either start running them (if assigned a Standby) or upgrade it to a StreamTask (if assigned an Active). I think it *should* be possible, but the devil is usually in the details! 203b. (managesOffsets deprecation) Done 204. (Downgrade) In my testing, when an older version of RocksDBStore attempts to open a database with an unknown column family (i.e the offsets cf), it throws an Exception, which is caught and rethrown as TaskCorruptedException; this triggers a wipe of local Task state and crashes the StreamThread. On restart, it restores as normal. 205. (Segment stores implementation) I'm deliberately not detailing this implementation in the KIP, because all SegmentStore APIs are internal, so this is really just an implementation detail. What I'm (currently) doing is storing offsets in each Segment. Obviously the currently live segment will be the only one with offsets being advanced, so we always return offsets from the currently live segment, but if there isn't one, then we go backwards through the existing offsets (i.e. starting with the most recent) and return the first available offset. I confess that SegmentStores are not an area I know much about, but I believe this should work. 206. (KeyValueStoreTestDriver) Hmm, good point. I thought this was part of the test-utils package along with TopologyTestDriver. I'll keep it out of the KIP. General status update: I've begun implementing this KIP in a branch separate from my earlier work. One of my primary goals is to implement it incrementally, in a way that allows each commit to be independently reviewed and merged to trunk without breaking anything. That should keep reviews much more concise. I'll start opening PRs once the KIP has been accepted *and* I'm close enough to completion that we can guarantee getting it all done by the next release. -- Cheers, Nick On Tue, 4 Jun 2024 at 20:34, Matthias J. Sax wrote: > Nick, > > Thanks a lot for updating the KIP. I made a pass over it. Overall LGTM. > A few nits and some more minor questions: > > > > 200: nit (Javadocs for `StateStore.managesOffsets()`): > > > This is highly > > recommended, if possible, to ensure that custom StateStores provide the > consistency guarantees that Kafka Streams > > expects when operating under the {@code exactly-once} {@code > processing.mode}. > > Given that we make it mandatory, we should rephrase this: "high > recommended" does not seems to be strong enough wording. > > > > 201: Javadocs for `StateStore.commit(final Map > changelogOffsets)`: > > > Implementations SHOULD ensure that {@code changelogOffsets} are > committed to disk atomically with the > > records they represent, if possible. > > Not sure if I can follow? Why "should ensure", but not "must ensure"? > > > > 202: New metrics: > > `commit-rate` -> Description says "The number of calls to..." -- Should > be "The number of calls per second to..."? > > `commit-latency-[]` -> Description says "The [] time taken to" -- Should > be "The [] time in nanoseconds taken to..."? (or milliseconds in case we > report in millis?) > > > > 203: Section "Consumer Rebalance Metadata" > > > We will then cache these o
[VOTE] KIP-1035: StateStore managed changelog offsets
Hi everyone, I'd like to call a vote on KIP-1035[1]. Regards, Nick 1: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets
Re: [DISCUSS] KIP-1056: Remove `default.` prefix for exception handler StreamsConfig
Hi everyone, On a semantic note, would it perhaps make more sense to rename them " uncaught." instead? These handlers are essentially the "last resort" exception handlers, because Exceptions can be caught *within* a component, e.g. a Deserializer can catch and handle an exception without the configured default.deserialization.exception.handler being invoked. I think this would better align these with JVM norms, e.g. Thread#setUncaughtExceptionHandler, which behaves the same (albeit configured through code). Regards, Nick On Thu, 13 Jun 2024 at 10:22, Muralidhar Basani wrote: > Thanks Matthias and Greg. > > Have updated the KIP based on Matthias's comments. > > >> 100: Config names are part of the public interface, and the KIP should > >> not say "none" in this section, but call out which configs are > >> deprecated and which ones are newly added. > > Updated kip. > > > >> 101: Nit. In "Propose Changes" there is the template placeholder text > >> > >>> Describe the new thing you want to do in appropriate detail. This may > be > >> fairly extensive and have large subsections of its own. Or it may be a > few > >> sentences. Use judgement based on the scope of the change. > >> > >> Similarly in "Test Plan" section > >> > > Updated kip. > > > >> 102: The "Deprecation" section should explain the behavior if both, old > >> and new configs, are set. > > Updated kip. I think a ConfigException has to be thrown here if both > configs are set. > > Thanks, > Murali > > On Thu, Jun 13, 2024 at 2:28 AM Matthias J. Sax wrote: > > > Thanks Greg, this is a valid idea. > > > > However, there was no demand in the past to allow for error handler > > callbacks in a fine grained manner, and I am frankly also not sure if it > > would make sense or would be required. > > > > Thus, I am not concerned about this case, and believe this KIP does make > > sense. > > > > Happy to change my mind if somebody has a different opinion. We could > > re-purpose this KIP to add per-topic error handler, too. > > > > > > > > -Matthias > > > > On 6/12/24 1:11 PM, Greg Harris wrote: > > > Hi Murali, > > > > > > Thanks for the KIP! > > > > > > I'm not familiar with Streams so I'll pose a general question, open for > > > anyone to answer: > > > > > > The configs that are being changed don't currently accept in-place > > > overwrites in the code, so the "default.*" prefix doesn't make sense. > > Could > > > there be a KIP to accept in-place overwrites in the future, such that > the > > > "default.*" prefix would make sense? > > > If so, this KIP would make that other KIP harder to implement, as we > > would > > > have already recommended everyone to move off of the "default.* prefix. > > Or > > > to put it another way, this KIP closes doors rather than opening them. > > > > > > Or at least that's how it looks to a Streams outsider. I'm happy to > defer > > > to the experts in this case :) > > > > > > Thanks, > > > Greg > > > > > > On Mon, Jun 10, 2024 at 3:47 PM Matthias J. Sax > > wrote: > > > > > >> Thanks for the KIP Murali, > > >> > > >> Overall LGTM. A few comments. > > >> > > >> > > >> > > >> 100: Config names are part of the public interface, and the KIP should > > >> not say "none" in this section, but call out which configs are > > >> deprecated and which ones are newly added. > > >> > > >> > > >> 101: Nit. In "Propose Changes" there is the template placeholder text > > >> > > >>> Describe the new thing you want to do in appropriate detail. This may > > be > > >> fairly extensive and have large subsections of its own. Or it may be a > > few > > >> sentences. Use judgement based on the scope of the change. > > >> > > >> Similarly in "Test Plan" section > > >> > > >> Please remove both :) > > >> > > >> > > >> 102: The "Deprecation" section should explain the behavior if both, > old > > >> and new configs, are set. > > >> > > >> > > >> Thanks a lot! > > >> > > >> > > >> -Matthias > > >> > > >> > > >> On 6/9/24 9:30 PM, Muralidhar Basani wrote: > > >>> Hello all, > > >>> > > >>> With this KIP > > >>> < > > >> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1056%3A+Remove+%60default.%60+prefix+for+exception+handler+StreamsConfig > > >>> , > > >>> I would like to mention that a couple of exception handler configs in > > >>> StreamsConfig are defined as default configs, despite having no > > >> alternative > > >>> values. Hence would propose to deprecate them and introduce new > configs > > >>> without the 'default.' prefix. > > >>> > > >>> This KIP is briefly discussed here in the jira KAFKA-16853 > > >>> <https://issues.apache.org/jira/browse/KAFKA-16863> too. > > >>> > > >>> I would appreciate any feedback or suggestions you might have. > > >>> > > >>> Thanks, > > >>> Murali > > >>> > > >> > > > > > >
Re: [VOTE] KIP-1035: StateStore managed changelog offsets
Hi everyone, By my count, it passes with 4 binding +1s and no -1s. Thanks for your votes, review and discussion. I'll update the KIP and start opening PRs! Regards, Nick On Wed, 19 Jun 2024 at 09:16, Lucas Brutschy wrote: > Thanks, Nick! > > +1 (binding) > > On Wed, Jun 19, 2024 at 10:14 AM Sophie Blee-Goldman > wrote: > > > > +1 (binding) thanks Nick! > > > > On Thu, Jun 13, 2024 at 12:39 AM Bruno Cadonna > wrote: > > > > > Thanks Nick! > > > > > > Great KIP! > > > > > > +1 (binding) > > > > > > Best, > > > Bruno > > > > > > On 6/13/24 2:31 AM, Matthias J. Sax wrote: > > > > Thanks Nick. > > > > > > > > +1 (binding) > > > > > > > > > > > > Looking forward to get this all merged! > > > > > > > > > > > > -Matthias > > > > > > > > On 6/12/24 9:36 AM, Nick Telford wrote: > > > >> Hi everyone, > > > >> > > > >> I'd like to call a vote on KIP-1035[1]. > > > >> > > > >> Regards, > > > >> Nick > > > >> > > > >> 1: > > > >> > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > > > >> > > > >
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi everyone, Looks really promising, and I can see this resolving several issues I've noticed. I particularly like the choice to use a String for Subtopology ID, as it will (eventually) lead to a better solution to KIP-816. I noticed a few typos in the KIP that I thought I'd mention: NT1. In several places you refer to "task changelog end offsets", while in others, you call it "task end offsets". Which is it? NT2. Under "Group Configurations", you included "group.streams.max.warmup.replicas", but I think you meant "group.streams.num.warmup.replicas"? NT3. Not a typo, but a suggestion: it makes sense to set the default for "group.streams.num.warmup.replicas" to 2, for compatibility with the existing defaults, but why set the default for "group.streams.max.warmup.replicas" to only 4? That seems extremely restrictive. These "max" configs are typically used to prevent a subset of users causing problems on the shared broker cluster - what's the reason to set such a restrictive value for max warmup replicas? If I had 10,000 warmup replicas, would it cause a noticeable problem on the brokers? NT4. It's implied that clients send the changelog offsets for *all* dormant stateful Tasks, but the current behaviour is that clients will only send the changelog offsets for the stateful Tasks that they are able to lock on-disk. Since this is a change in behaviour, perhaps this should be called out explicitly? Regards, Nick On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy wrote: > Hi Andrew, > > thanks for the comment. > > AS12: I clarified the command-line interface. It's supposed to be used > with --reset-offsets and --delete-offsets. I removed --topic. > > AS13: Yes, it's --delete. I clarified the command-line interface. > > Cheers, > Lucas > > On Tue, Aug 13, 2024 at 4:14 PM Andrew Schofield > wrote: > > > > Hi Lucas, > > Thanks for the KIP update. > > > > I think that `kafka-streams-groups.sh` looks like a good equivalent to > > the tools for the other types of groups. > > > > AS12: In kafka-streams-groups.sh, the description for the > > --input-topics option seems insufficient. Why is an input topic specified > > with this option different than a topic specified with --topic? Why is > > It --input-topics rather than --input-topic? Which action of this tool > > does this option apply to? > > > > AS13: Similarly, for --internal-topics, which action of the tool does it > > apply to? I suppose it’s --delete, but it’s not clear to me. > > > > Thanks, > > Andrew > > > > > On 11 Aug 2024, at 12:10, Lucas Brutschy > wrote: > > > > > > Hi Andrew/Lianet, > > > > > > I have added an administrative command-line tool (replacing > > > `kafka-streams-application-reset`) and extensions of the Admin API for > > > listing, deleting, describing groups and listing, altering and > > > deleting offsets for streams groups. No new RPCs have to be added, > > > however, we duplicate some of the API in the admin client that exist > > > for consumer groups. It seems to me cleaner to duplicate some > > > code/interface here, instead of using "consumer group" APIs for > > > streams groups, or renaming existing APIs that use "consumerGroup" in > > > the name to something more generic (which wouldn't cover share > > > groups). > > > > > > I think for now, all comments are addressed. > > > > > > Cheers, > > > Lucas > > > > > > On Tue, Aug 6, 2024 at 3:19 PM Lucas Brutschy > wrote: > > >> > > >> Hi Lianet and Andrew, > > >> > > >> LM1/LM2: You are right. The idea is to omit fields exactly in the same > > >> situations as in KIP-848. In the KIP, I stuck with how the behavior > > >> was defined in KIP-848 (e.g. KIP-848 defined that that instance ID > > >> will be omitted if it did not change since the last heartbeat). But > > >> you are correct that the implementation handles these details slightly > > >> differently. I updated the KIP to match more closely the behavior of > > >> the KIP-848 implementation. > > >> > > >> LM9: Yes, there are several options to do this. The idea is to have > > >> only one client initialize the topology, not all clients. It seems > > >> easier to understand on the protocol level (otherwise we'd have N > > >> topology initializations racing with a hard-to-determine winner). We > > >> also expect the payload of the request to grow in the futu
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas, NT4. The reason I mentioned this was because, while implementing 1035, I stumbled across a problem: initially I had changed it so that threads always reported the lag for *all* dormant Tasks on-disk, even if it meant multiple threads reporting lag for the same Tasks. I found that this didn't work, apparently because the assignor assumes that multiple threads on the same instance always report disjoint sets. >From reading through 1071, it sounded like this assumption is no longer being made by the assignor, and that the processId field would allow the assignor to understand when multiple clients reporting lag for the same Tasks are on the same instance. This would enable us to do away with the locking when reporting lag, and just have threads report the lag for every Task on-disk, even if other threads are reporting lag for the same Tasks. But it sounds like this is not correct, and that the new assignor will make the same assumptions as the old one? Regards, Nick On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy wrote: > Hi Nick! > > Thanks for getting involved in the discussion. > > NT1. We are always referring to offsets in the changelog topics here. > I tried to make it more consistent. But in the schemas and API, I find > "task changelog end offset" a bit lengthy, so we use "task offset" and > "task end offset" for short. We could change it, if people think this > is confusing. > > NT2. You are right. The confusing part is that the current streams > config is called `max.warmup.replicas`, but in the new protocol, we > are bounding the group-level parameter using > `group.streams.max.warmup.replicas`. If we wanted to keep > `group.streams.max.warmup.replicas` for the config name on the > group-level, we'd have to bound it using > `group.streams.max.max.warmup.replicas`. I prefer not doing this, but > open to suggestions. > > NT3. You are right, we do not need to make it this restrictive. I > think the main problem with having 10,000 warm-up replicas would be > that it slows down the assignment inside the broker - once we are > closer to production-ready implementation, we may have better numbers > of this and may revisit these defaults. I'll set the max to 100 for > now, but it would be good to hear what values people typically use in > their production workloads. > > NT4. We will actually only report the offsets if we manage to acquire > the lock. I tried to make this more precise. I suppose also with > KIP-1035, we'd require the lock to read the offset? > > Cheers, > Lucas > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford > wrote: > > > > Hi everyone, > > > > Looks really promising, and I can see this resolving several issues I've > > noticed. I particularly like the choice to use a String for Subtopology > ID, > > as it will (eventually) lead to a better solution to KIP-816. > > > > I noticed a few typos in the KIP that I thought I'd mention: > > > > NT1. > > In several places you refer to "task changelog end offsets", while in > > others, you call it "task end offsets". Which is it? > > > > NT2. > > Under "Group Configurations", you included > > "group.streams.max.warmup.replicas", but I think you meant > > "group.streams.num.warmup.replicas"? > > > > NT3. > > Not a typo, but a suggestion: it makes sense to set the default for > > "group.streams.num.warmup.replicas" to 2, for compatibility with the > > existing defaults, but why set the default for > > "group.streams.max.warmup.replicas" to only 4? That seems extremely > > restrictive. These "max" configs are typically used to prevent a subset > of > > users causing problems on the shared broker cluster - what's the reason > to > > set such a restrictive value for max warmup replicas? If I had 10,000 > > warmup replicas, would it cause a noticeable problem on the brokers? > > > > NT4. > > It's implied that clients send the changelog offsets for *all* dormant > > stateful Tasks, but the current behaviour is that clients will only send > > the changelog offsets for the stateful Tasks that they are able to lock > > on-disk. Since this is a change in behaviour, perhaps this should be > called > > out explicitly? > > > > Regards, > > Nick > > > > On Thu, 15 Aug 2024 at 10:55, Lucas Brutschy .invalid> > > wrote: > > > > > Hi Andrew, > > > > > > thanks for the comment. > > > > > > AS12: I clarified the command-line interface. It's supposed to be used > > > with --res
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas, NT4. Given that the new assignment procedure guarantees that a Task has been closed before it is assigned to a different client, I don't think there should be a problem with concurrent access? I don't think we should worry too much about 1035 here, as it's orthogonal to 1071. I don't think that 1035 *requires* the locking, and indeed once 1071 is the only assignment mechanism, we should be able to do away with the locking completely (I think). Anyway, given your point about it not being possible to guarantee disjoint sets, does it make sense to require clients to continue to supply the lags for only a subset of the dormant Tasks on-disk? Wouldn't it be simpler to just have them supply everything, since the assignor has to handle overlapping sets anyway? Cheers, Nick On Fri, 16 Aug 2024 at 13:51, Lucas Brutschy wrote: > Hi Nick, > > NT4. I think it will be hard anyway to ensure that the assignor always > gets disjoint sets (there is no synchronized rebalance point anymore, > so locks wouldn't prevent two clients reporting the same dormant > task). So I think we'll have to lift this restriction. I was thinking > more that locking is required to prevent concurrent access. In > particular, I was expecting that the lock will avoid two threads > opening the same RocksDB in KIP-1035. Wouldn't this cause problems? > > Cheers, > Lucas > > On Fri, Aug 16, 2024 at 11:34 AM Nick Telford > wrote: > > > > Hi Lucas, > > > > NT4. > > The reason I mentioned this was because, while implementing 1035, I > > stumbled across a problem: initially I had changed it so that threads > > always reported the lag for *all* dormant Tasks on-disk, even if it meant > > multiple threads reporting lag for the same Tasks. I found that this > didn't > > work, apparently because the assignor assumes that multiple threads on > the > > same instance always report disjoint sets. > > > > From reading through 1071, it sounded like this assumption is no longer > > being made by the assignor, and that the processId field would allow the > > assignor to understand when multiple clients reporting lag for the same > > Tasks are on the same instance. This would enable us to do away with the > > locking when reporting lag, and just have threads report the lag for > every > > Task on-disk, even if other threads are reporting lag for the same Tasks. > > > > But it sounds like this is not correct, and that the new assignor will > make > > the same assumptions as the old one? > > > > Regards, > > Nick > > > > On Fri, 16 Aug 2024 at 10:17, Lucas Brutschy .invalid> > > wrote: > > > > > Hi Nick! > > > > > > Thanks for getting involved in the discussion. > > > > > > NT1. We are always referring to offsets in the changelog topics here. > > > I tried to make it more consistent. But in the schemas and API, I find > > > "task changelog end offset" a bit lengthy, so we use "task offset" and > > > "task end offset" for short. We could change it, if people think this > > > is confusing. > > > > > > NT2. You are right. The confusing part is that the current streams > > > config is called `max.warmup.replicas`, but in the new protocol, we > > > are bounding the group-level parameter using > > > `group.streams.max.warmup.replicas`. If we wanted to keep > > > `group.streams.max.warmup.replicas` for the config name on the > > > group-level, we'd have to bound it using > > > `group.streams.max.max.warmup.replicas`. I prefer not doing this, but > > > open to suggestions. > > > > > > NT3. You are right, we do not need to make it this restrictive. I > > > think the main problem with having 10,000 warm-up replicas would be > > > that it slows down the assignment inside the broker - once we are > > > closer to production-ready implementation, we may have better numbers > > > of this and may revisit these defaults. I'll set the max to 100 for > > > now, but it would be good to hear what values people typically use in > > > their production workloads. > > > > > > NT4. We will actually only report the offsets if we manage to acquire > > > the lock. I tried to make this more precise. I suppose also with > > > KIP-1035, we'd require the lock to read the offset? > > > > > > Cheers, > > > Lucas > > > > > > On Thu, Aug 15, 2024 at 8:40 PM Nick Telford > > > wrote: > > > > > > > > Hi everyone, > > &
Re: Kafka trunk test & build stability
Hi everyone, Regarding building a "dependency graph"... Gradle already has this information, albeit fairly coarse-grained. You might be able to get some considerable improvement by configuring the Gradle Remote Build Cache. It looks like it's currently disabled explicitly: https://github.com/apache/kafka/blob/trunk/settings.gradle#L46 The trick is to have trunk builds write to the cache, and PR builds only read from it. This way, any PR based on trunk should be able to cache not only the compilation, but also the tests from dependent modules that haven't changed (e.g. for a PR that only touches the connect/streams modules). This would probably be preferable to having to hand-maintain some rules/dependency graph in the CI configuration, and it's quite straight-forward to configure. Bonus points if the Remote Build Cache is readable publicly, enabling contributors to benefit from it locally. Regards, Nick On Tue, 2 Jan 2024 at 13:00, Lucas Brutschy wrote: > Thanks for all the work that has already been done on this in the past > days! > > Have we considered running our test suite with > -XX:+HeapDumpOnOutOfMemoryError and uploading the heap dumps as > Jenkins build artifacts? This could speed up debugging. Even if we > store them only for a day and do it only for trunk, I think it could > be worth it. The heap dumps shouldn't contain any secrets, and I > checked with the ASF infra team, and they are not concerned about the > additional disk usage. > > Cheers, > Lucas > > On Wed, Dec 27, 2023 at 2:25 PM Divij Vaidya > wrote: > > > > I have started to perform an analysis of the OOM at > > https://issues.apache.org/jira/browse/KAFKA-16052. Please feel free to > > contribute to the investigation. > > > > -- > > Divij Vaidya > > > > > > > > On Wed, Dec 27, 2023 at 1:23 AM Justine Olshan > > > wrote: > > > > > I am still seeing quite a few OOM errors in the builds and I was > curious if > > > folks had any ideas on how to identify the cause and fix the issue. I > was > > > looking in gradle enterprise and found some info about memory usage, > but > > > nothing detailed enough to help figure the issue out. > > > > > > OOMs sometimes fail the build immediately and in other cases I see it > get > > > stuck for 8 hours. (See > > > > > > > https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2508/pipeline/12 > > > ) > > > > > > I appreciate all the work folks are doing here and I will continue to > try > > > to help as best as I can. > > > > > > Justine > > > > > > On Tue, Dec 26, 2023 at 1:04 PM David Arthur > > > wrote: > > > > > > > S2. We’ve looked into this before, and it wasn’t possible at the time > > > with > > > > JUnit. We commonly set a timeout on each test class (especially > > > integration > > > > tests). It is probably worth looking at this again and seeing if > > > something > > > > has changed with JUnit (or our usage of it) that would allow a global > > > > timeout. > > > > > > > > > > > > S3. Dedicated infra sounds nice, if we can get it. It would at least > > > remove > > > > some variability between the builds, and hopefully eliminate the > > > > infra/setup class of failures. > > > > > > > > > > > > S4. Running tests for what has changed sounds nice, but I think it is > > > risky > > > > to implement broadly. As Sophie mentioned, there are probably some > lines > > > we > > > > could draw where we feel confident that only running a subset of > tests is > > > > safe. As a start, we could probably work towards skipping CI for > non-code > > > > PRs. > > > > > > > > > > > > --- > > > > > > > > > > > > As an aside, I experimented with build caching and running affected > > > tests a > > > > few months ago. I used the opportunity to play with Github Actions, > and I > > > > quite liked it. Here’s the workflow I used: > > > > > https://github.com/mumrah/kafka/blob/trunk/.github/workflows/push.yml. I > > > > was trying to see if we could use a build cache to reduce the > compilation > > > > time on PRs. A nightly/periodic job would build trunk and populate a > > > Gradle > > > > build cache. PR builds would read from that cache which would enable > them > > > > to only com
Re: Kafka trunk test & build stability
Addendum: I've opened a PR with what I believe are the changes necessary to enable Remote Build Caching, if you choose to go that route: https://github.com/apache/kafka/pull/15109 On Tue, 2 Jan 2024 at 14:31, Nick Telford wrote: > Hi everyone, > > Regarding building a "dependency graph"... Gradle already has this > information, albeit fairly coarse-grained. You might be able to get some > considerable improvement by configuring the Gradle Remote Build Cache. It > looks like it's currently disabled explicitly: > https://github.com/apache/kafka/blob/trunk/settings.gradle#L46 > > The trick is to have trunk builds write to the cache, and PR builds only > read from it. This way, any PR based on trunk should be able to cache not > only the compilation, but also the tests from dependent modules that > haven't changed (e.g. for a PR that only touches the connect/streams > modules). > > This would probably be preferable to having to hand-maintain some > rules/dependency graph in the CI configuration, and it's quite > straight-forward to configure. > > Bonus points if the Remote Build Cache is readable publicly, enabling > contributors to benefit from it locally. > > Regards, > Nick > > On Tue, 2 Jan 2024 at 13:00, Lucas Brutschy > wrote: > >> Thanks for all the work that has already been done on this in the past >> days! >> >> Have we considered running our test suite with >> -XX:+HeapDumpOnOutOfMemoryError and uploading the heap dumps as >> Jenkins build artifacts? This could speed up debugging. Even if we >> store them only for a day and do it only for trunk, I think it could >> be worth it. The heap dumps shouldn't contain any secrets, and I >> checked with the ASF infra team, and they are not concerned about the >> additional disk usage. >> >> Cheers, >> Lucas >> >> On Wed, Dec 27, 2023 at 2:25 PM Divij Vaidya >> wrote: >> > >> > I have started to perform an analysis of the OOM at >> > https://issues.apache.org/jira/browse/KAFKA-16052. Please feel free to >> > contribute to the investigation. >> > >> > -- >> > Divij Vaidya >> > >> > >> > >> > On Wed, Dec 27, 2023 at 1:23 AM Justine Olshan >> >> > wrote: >> > >> > > I am still seeing quite a few OOM errors in the builds and I was >> curious if >> > > folks had any ideas on how to identify the cause and fix the issue. I >> was >> > > looking in gradle enterprise and found some info about memory usage, >> but >> > > nothing detailed enough to help figure the issue out. >> > > >> > > OOMs sometimes fail the build immediately and in other cases I see it >> get >> > > stuck for 8 hours. (See >> > > >> > > >> https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka/detail/trunk/2508/pipeline/12 >> > > ) >> > > >> > > I appreciate all the work folks are doing here and I will continue to >> try >> > > to help as best as I can. >> > > >> > > Justine >> > > >> > > On Tue, Dec 26, 2023 at 1:04 PM David Arthur >> > > wrote: >> > > >> > > > S2. We’ve looked into this before, and it wasn’t possible at the >> time >> > > with >> > > > JUnit. We commonly set a timeout on each test class (especially >> > > integration >> > > > tests). It is probably worth looking at this again and seeing if >> > > something >> > > > has changed with JUnit (or our usage of it) that would allow a >> global >> > > > timeout. >> > > > >> > > > >> > > > S3. Dedicated infra sounds nice, if we can get it. It would at least >> > > remove >> > > > some variability between the builds, and hopefully eliminate the >> > > > infra/setup class of failures. >> > > > >> > > > >> > > > S4. Running tests for what has changed sounds nice, but I think it >> is >> > > risky >> > > > to implement broadly. As Sophie mentioned, there are probably some >> lines >> > > we >> > > > could draw where we feel confident that only running a subset of >> tests is >> > > > safe. As a start, we could probably work towards skipping CI for >> non-code >> > > > PRs. >> > > > >> > > > >> > > > --- >> > > > &g
Re: [DISCUSS] KIP-990: Capability to SUSPEND Tasks on DeserializationException
Hi folks, Sorry I haven't got back to you until now. It's become clear that I hadn't anticipated a significant number of technical challenges that this KIP presents. I think expecting users to understand the ramifications on aggregations, joins and windowing ultimately kills it: it only becomes a problem under *specific* combinations of operations, and those problems would manifest in ways that might be difficult for users to detect, let alone diagnose. I think it's best to abandon this KIP, at least for now. If anyone else sees a use for and path forwards for it, feel free to pick it up. Since I'm abandoning the KIP, I won't update the Motivation section. But I will provide a bit of background here on why I originally suggested it, in case you're interested: In my organisation, we don't have schemas for *any* of our data in Kafka. Consequently, one of the biggest causes of downtime in our applications are "bad" records being written by Producers. We integrate with a lot of third-party APIs, and have Producers that just push that data straight to Kafka with very little validation. I've lost count of the number of times my application has been crashed by a deserialization exception because we received a record that looks like '{"error": "Bad gatetway"}' or similar, instead of the actual payload we expect. The difficulty is we can't just use CONTINUE to discard these messages, because we also sometimes get deserialization exceptions caused by an upstream schema change that is incompatible with the expectations of our app. In these cases, we don't want to discard records (which are technically valid), but instead need to adjust our application to be compatible with the new schema, before processing them. Crucially, we use a monolithic app, with more than 45 sub-topologies, so crashing the entire app just because of one bad record causes downtime on potentially unrelated sub-topologies. This was the motivation for this KIP, which would have enabled users to make a decision on what to do about a bad message, *without taking down the entire application*. Obviously, the *correct* solution to this problem is to introduce schemas on our topics and have our Producers correctly validate records before writing them to the cluster. This is ultimately the solution I am going to pursue in lieu of this KIP. I still think this KIP could have been useful for dealing with an incompatible upstream schema change; by pausing only the sub-topologies that are affected by the schema change, while leaving others to continue to run while the user deploys a fix. However, in practice I think few users have monolithic apps like ours, and most instead de-couple unrelated topics via different apps, which reduces the impact of incompatible upstream schema changes. Thanks for your reviews and feedback, I've learned a lot, as always; this time, mostly about how, when authoring a KIP, I should always ask myself: "yes, but what about timestamp ordering?" :-D Nick On Thu, 14 Mar 2024 at 03:27, Sophie Blee-Goldman wrote: > > > > Well, the KIP mentions the ability to either re-try the record (eg, > > after applying some external fix that would allow Kafka Streams to now > > deserialize the record now) or to skip it by advancing the offset. > > > That's fair -- you're definitely right that what's described in the KIP > document > right now would not be practical. I just wanted to clarify that this > doesn't > mean the feature as a whole is impractical, but certainly we'd want to > update the proposal to remove the line about resetting offsets via external > tool and come up with a more concrete approach, and perhaps describe > it in more detail. > > That's probably not worth getting into until/unless we decide whether to > go forward with this feature in the first place. I'll let Nick reflect on > the > motivation and your other comments and then decide whether he still > wants to pursue it. > > To Nick: if you want to go through with this KIP and can expand on the > motivation so that we understand it better, I'd be happy to help work > out the details. For now I'll just wait for your decision > > On Wed, Mar 13, 2024 at 10:24 AM Matthias J. Sax wrote: > > > Yes, about the "drop records" case. It's a very common scenario to have > > a repartition step before a windowed aggregation or a join with > > grace-period. > > > > > > About "add feature vs guard users": it's always a tricky question and > > tradeoff. For this particular KIP, I personally think we should opt to > > not add the feature but guard the users, as I don't see too much value > > compared to the complexity and "traps"
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Hi everyone, Sorry for leaving this for so long. So much for "3 weeks until KIP freeze"! On Sophie's comments: 1. Would Matthias's suggestion of a separate metric tracking the age of the oldest open iterator (within the tag set) satisfy this? That way we can keep iterator-duration-(avg|max) for closed iterators, which can be useful for performance debugging for iterators that don't leak. I'm not sure what we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems like a mouthful. 2. You're right, it makes more sense to provide iterator-duration-(avg|max). Honestly, I can't remember why I had "total" before, or why I was computing a rate-of-change over it. 3, 4, 5, 6. Agreed, I'll make all those changes as suggested. 7. Combined with Matthias's point about RocksDB, I'm convinced that this is the wrong KIP for these. I'll introduce the additional Rocks metrics in another KIP. On Matthias's comments: A. Not sure about the time window. I'm pretty sure all existing avg/max metrics are since the application was started? Any other suggestions here would be appreciated. B. Agreed. See point 1 above. C. Good point. My focus was very much on Rocks memory leaks when I wrote the first draft. I can generalise it. My only concern is that it might make it more difficult to detect Rocks iterator leaks caused *within* our high-level iterator, e.g. RocksJNI, RocksDB, RocksDBStore, etc. But we could always provide a RocksDB-specific metric for this, as you suggested. D. Hmm, we do already have MeteredKeyValueIterator, which automatically wraps the iterator from inner-stores of MeteredKeyValueStore. If we implemented these metrics there, then custom stores would automatically gain the functionality, right? This seems like a pretty logical place to implement these metrics, since MeteredKeyValueStore is all about adding metrics to state stores. > I imagine the best way to implement this would be to do so at the > high-level iterator rather than implementing it separately for each > specific iterator implementation for every store type. Sophie, does MeteredKeyValueIterator fit with your recommendation? Thanks for your thoughts everyone, I'll update the KIP now. Nick On Thu, 14 Mar 2024 at 03:37, Sophie Blee-Goldman wrote: > About your last two points: I completely agree that we should try to > make this independent of RocksDB, and should probably adopt a > general philosophy of being store-implementation agnostic unless > there is good reason to focus on a particular store type: eg if it was > only possible to implement for certain stores, or only made sense in > the context of a certain store type but not necessarily stores in general. > > While leaking memory due to unclosed iterators on RocksDB stores is > certainly the most common issue, I think Matthias sufficiently > demonstrated that the problem of leaking iterators is not actually > unique to RocksDB, and we should consider including in-memory > stores at the very least. I also think that at this point, we may as well > just implement the metrics for *all* store types, whether rocksdb or > in-memory or custom. Not just because it probably applies to all > store types (leaking iterators are rarely a good thing!) but because > I imagine the best way to implement this would be to do so at the > high-level iterator rather than implementing it separately for each > specific iterator implementation for every store type. > > That said, I haven't thought all that carefully about the implementation > yet -- it just strikes me as easiest to do at the top level of the store > hierarchy rather than at the bottom. My gut instinct may very well be > wrong, but that's what it's saying > > On Thu, Mar 7, 2024 at 10:43 AM Matthias J. Sax wrote: > > > Seems I am late to this party. Can we pick this up again aiming for 3.8 > > release? I think it would be a great addition. Few comments: > > > > > > - I think it does make sense to report `iterator-duration-avg` and > > `iterator-duration-max` for all *closed* iterators -- it just seems to > > be a useful metric (wondering if this would be _overall_ or bounded to > > some time window?) > > > > - About the duration iterators are currently open, I believe the only > > useful way is to report the "oldest iterator", ie, the smallest iterator > > open-time, of all currently open-iterator? We all agree that in general, > > leaking iterator would bump the count metric, and if there is a few ones > > which are not closed and open for a long time, it seem sufficient to > > detect the single oldest one for alerting purpose? > > > > - What I don't like about the KIP is it focus on RocksDB. I don't thi
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Quick addendum: My suggested metric "oldest-open-iterator-age-seconds" should be "oldest-open-iterator-age-ms". Milliseconds is obviously a better granularity for such a metric. Still accepting suggestions for a better name. On Thu, 28 Mar 2024 at 13:41, Nick Telford wrote: > Hi everyone, > > Sorry for leaving this for so long. So much for "3 weeks until KIP freeze"! > > On Sophie's comments: > 1. Would Matthias's suggestion of a separate metric tracking the age of > the oldest open iterator (within the tag set) satisfy this? That way we can > keep iterator-duration-(avg|max) for closed iterators, which can be useful > for performance debugging for iterators that don't leak. I'm not sure what > we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems > like a mouthful. > > 2. You're right, it makes more sense to provide > iterator-duration-(avg|max). Honestly, I can't remember why I had "total" > before, or why I was computing a rate-of-change over it. > > 3, 4, 5, 6. Agreed, I'll make all those changes as suggested. > > 7. Combined with Matthias's point about RocksDB, I'm convinced that this > is the wrong KIP for these. I'll introduce the additional Rocks metrics in > another KIP. > > On Matthias's comments: > A. Not sure about the time window. I'm pretty sure all existing avg/max > metrics are since the application was started? Any other suggestions here > would be appreciated. > > B. Agreed. See point 1 above. > > C. Good point. My focus was very much on Rocks memory leaks when I wrote > the first draft. I can generalise it. My only concern is that it might make > it more difficult to detect Rocks iterator leaks caused *within* our > high-level iterator, e.g. RocksJNI, RocksDB, RocksDBStore, etc. But we > could always provide a RocksDB-specific metric for this, as you suggested. > > D. Hmm, we do already have MeteredKeyValueIterator, which automatically > wraps the iterator from inner-stores of MeteredKeyValueStore. If we > implemented these metrics there, then custom stores would automatically > gain the functionality, right? This seems like a pretty logical place to > implement these metrics, since MeteredKeyValueStore is all about adding > metrics to state stores. > > > I imagine the best way to implement this would be to do so at the > > high-level iterator rather than implementing it separately for each > > specific iterator implementation for every store type. > > Sophie, does MeteredKeyValueIterator fit with your recommendation? > > Thanks for your thoughts everyone, I'll update the KIP now. > > Nick > > On Thu, 14 Mar 2024 at 03:37, Sophie Blee-Goldman > wrote: > >> About your last two points: I completely agree that we should try to >> make this independent of RocksDB, and should probably adopt a >> general philosophy of being store-implementation agnostic unless >> there is good reason to focus on a particular store type: eg if it was >> only possible to implement for certain stores, or only made sense in >> the context of a certain store type but not necessarily stores in general. >> >> While leaking memory due to unclosed iterators on RocksDB stores is >> certainly the most common issue, I think Matthias sufficiently >> demonstrated that the problem of leaking iterators is not actually >> unique to RocksDB, and we should consider including in-memory >> stores at the very least. I also think that at this point, we may as well >> just implement the metrics for *all* store types, whether rocksdb or >> in-memory or custom. Not just because it probably applies to all >> store types (leaking iterators are rarely a good thing!) but because >> I imagine the best way to implement this would be to do so at the >> high-level iterator rather than implementing it separately for each >> specific iterator implementation for every store type. >> >> That said, I haven't thought all that carefully about the implementation >> yet -- it just strikes me as easiest to do at the top level of the store >> hierarchy rather than at the bottom. My gut instinct may very well be >> wrong, but that's what it's saying >> >> On Thu, Mar 7, 2024 at 10:43 AM Matthias J. Sax wrote: >> >> > Seems I am late to this party. Can we pick this up again aiming for 3.8 >> > release? I think it would be a great addition. Few comments: >> > >> > >> > - I think it does make sense to report `iterator-duration-avg` and >> > `iterator-duration-max` for all *closed* iterators -- it just seems to >&g
Re: [DISCUSS] KIP-816: Topology changes without local state reset
Hi everyone, I'm going to resurrect this KIP, because I would like the community to benefit from our solution. In the end, we internally solved this problem using Option B: automatically moving state directories to the correct location whenever they're no longer aligned with the Topology. We implemented this for ourselves externally to Kafka Streams, by using Topology#describe() to analyse the Topology, and then moving state directories before calling KafkaStreams#start(). I've updated/re-written the KIP to focus on this solution, albeit properly integrated into Kafka Streams. Let me know what you think, Nick On Tue, 15 Feb 2022 at 16:23, Nick Telford wrote: > In the KIP, for Option A I suggested a new path of: > > /state/dir/stores// > > I made the mistake of thinking that the rocksdb/ segment goes *after* the > store name in the current scheme, e.g. > > /state/dir//[/rocksdb] > > This is a mistake. I'd always intended for a combination of the store name > and partition number to be encoded in the new path (instead of the store > name and task ID, that we have now). The exact encoding doesn't really > bother me too much, so if you have any conventions you think we should > follow here (hyphenated vs. underscored vs. directory separator, etc.) > please let me know. > > I should be able to find some time hopefully next week to start working on > this, which should shed some more light on issues that might arise. > > In the meantime I'll correct the KIP to include the rocksdb segment. > > Thanks everyone for your input so far! > > Nick > > On Mon, 14 Feb 2022 at 22:02, Guozhang Wang wrote: > >> Thanks for the clarification John! >> >> Nick, sorry that I was not super clear in my latest email. I meant exactly >> what John said. >> >> Just to clarify, I do think that this KIP is relatively orthogonal to the >> named topology work; as long as we still keep the topo name encoded it >> should be fine since two named topologies can indeed have the same store >> name, but that would not need to be considered as part of this KIP. >> >> >> Guozhang >> >> On Mon, Feb 14, 2022 at 9:02 AM John Roesler wrote: >> >> > Hi Nick, >> > >> > When Guozgang and I were chatting, we realized that it’s not completely >> > sufficient just to move the state store directories, because their names >> > are not unique. In particular, more than one partition of the store may >> be >> > assigned to the same instance. Right now, this is handled because the >> task >> > is encoded the partition number. >> > >> > For example, if we have a store "mystore" in subtopology 1 and we have >> two >> > out of four partitions (0 and 3) assigned to the local node, the disk >> will >> > have these paths: >> > >> > {app_id}/1_0/rocksdb/mystore >> > {app_id}/1_3/rocksdb/mystore >> > >> > Clearly, we can't just elevate both "mystore" directories to reside >> under >> > {appid}, because >> > they have the same name. When I think of option (A), here's what I >> picture: >> > >> > {app_id}/rocksdb/mystore-0 >> > {app_id}/rocksdb/mystore-3 >> > >> > In the future, one thing we're considering to do is actually store all >> the >> > positions in the same rocksDB database, which is a pretty convenient >> step >> > away from option (A) (another reason to prefer it to option (B) ). >> > >> > I just took a look at how named topologies are handled, and they're >> > actually >> > a separate path segment, not part of the task id, like this: >> > >> > {app_id}/__{topo_name}__/1_0/rocksdb/mystore >> > {app_id}/__{topo_name}__/1_3/rocksdb/mystore >> > >> > Which is pretty convenient because it means there are no >> > implications for your proposal. If you implement the above >> > code, then we'll just wind up with: >> > >> > {app_id}/__{topo_name}__/rocksdb/mystore-0 >> > {app_id}/__{topo_name}__/rocksdb/mystore-3 >> > >> > Does that make sense? >> > >> > Thanks, >> > -John >> > >> > >> > On Mon, Feb 14, 2022, at 03:57, Nick Telford wrote: >> > > Hi Guozhang, >> > > >> > > Sorry I haven't had the time to respond to your earlier email, but I >> just >> > > wanted to clarify something with respect to your most recent email. >> > > >> > > My original plan in option A is to remov
Re: [DISCUSS] KIP-989: RocksDB Iterator Metrics
Hi Matthias, > For the oldest iterator metric, I would propose something simple like > `iterator-opened-ms` and it would just be the actual timestamp when the > iterator was opened. I don't think we need to compute the actual age, > but user can to this computation themselves? That works for me; it's easier to implement like that :-D I'm a little concerned that the name "iterator-opened-ms" may not be obvious enough without reading the docs. > If we think reporting the age instead of just the timestamp is better, I > would propose `iterator-max-age-ms`. I should be sufficient to call out > (as it's kinda "obvious" anyway) that the metric applies to open > iterator only. While I think it's preferable to record the timestamp, rather than the age, this does have the benefit of a more obvious metric name. > Nit: the KIP says it's a store-level metric, but I think it would be > good to say explicitly that it's recorded with DEBUG level only? Yes, I've already updated the KIP with this information in the table. Regards, Nick On Sun, 31 Mar 2024 at 10:53, Matthias J. Sax wrote: > The time window thing was just an idea. Happy to drop it. > > For the oldest iterator metric, I would propose something simple like > `iterator-opened-ms` and it would just be the actual timestamp when the > iterator was opened. I don't think we need to compute the actual age, > but user can to this computation themselves? > > If we think reporting the age instead of just the timestamp is better, I > would propose `iterator-max-age-ms`. I should be sufficient to call out > (as it's kinda "obvious" anyway) that the metric applies to open > iterator only. > > And yes, I was hoping that the code inside MetereXxxStore might already > be setup in a way that custom stores would inherit the iterator metrics > automatically -- I am just not sure, and left it as an exercise for > somebody to confirm :) > > > Nit: the KIP says it's a store-level metric, but I think it would be > good to say explicitly that it's recorded with DEBUG level only? > > > > -Matthias > > > On 3/28/24 2:52 PM, Nick Telford wrote: > > Quick addendum: > > > > My suggested metric "oldest-open-iterator-age-seconds" should be > > "oldest-open-iterator-age-ms". Milliseconds is obviously a better > > granularity for such a metric. > > > > Still accepting suggestions for a better name. > > > > On Thu, 28 Mar 2024 at 13:41, Nick Telford > wrote: > > > >> Hi everyone, > >> > >> Sorry for leaving this for so long. So much for "3 weeks until KIP > freeze"! > >> > >> On Sophie's comments: > >> 1. Would Matthias's suggestion of a separate metric tracking the age of > >> the oldest open iterator (within the tag set) satisfy this? That way we > can > >> keep iterator-duration-(avg|max) for closed iterators, which can be > useful > >> for performance debugging for iterators that don't leak. I'm not sure > what > >> we'd call this metric, maybe: "oldest-open-iterator-age-seconds"? Seems > >> like a mouthful. > >> > >> 2. You're right, it makes more sense to provide > >> iterator-duration-(avg|max). Honestly, I can't remember why I had > "total" > >> before, or why I was computing a rate-of-change over it. > >> > >> 3, 4, 5, 6. Agreed, I'll make all those changes as suggested. > >> > >> 7. Combined with Matthias's point about RocksDB, I'm convinced that this > >> is the wrong KIP for these. I'll introduce the additional Rocks metrics > in > >> another KIP. > >> > >> On Matthias's comments: > >> A. Not sure about the time window. I'm pretty sure all existing avg/max > >> metrics are since the application was started? Any other suggestions > here > >> would be appreciated. > >> > >> B. Agreed. See point 1 above. > >> > >> C. Good point. My focus was very much on Rocks memory leaks when I wrote > >> the first draft. I can generalise it. My only concern is that it might > make > >> it more difficult to detect Rocks iterator leaks caused *within* our > >> high-level iterator, e.g. RocksJNI, RocksDB, RocksDBStore, etc. But we > >> could always provide a RocksDB-specific metric for this, as you > suggested. > >> > >> D. Hmm, we do already have MeteredKeyValueIterator, which automatically > >> wraps the iterator from inner-stores of MeteredKeyValueStore.
[DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi everyone, Based on some offline discussion, I've split out the "Atomic Checkpointing" section from KIP-892: Transactional Semantics for StateStores, into its own KIP KIP-1035: StateStore managed changelog offsets https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets While KIP-892 was adopted *with* the changes outlined in KIP-1035, these changes were always the most contentious part, and continued to spur discussion even after KIP-892 was adopted. All the changes introduced in KIP-1035 have been removed from KIP-892, and a hard dependency on KIP-1035 has been added to KIP-892 in their place. I'm hopeful that with some more focus on this set of changes, we can deliver something that we're all happy with. Regards, Nick
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi Bruno, Thanks for the review! 1, 4, 5. Done 3. You're right. I've removed the offending paragraph. I had originally adapted this from the guarantees outlined in KIP-892. But it's difficult to provide these guarantees without the KIP-892 transaction buffers. Instead, we'll add the guarantees back into the JavaDoc when KIP-892 lands. 2. Good point! This is the only part of the KIP that was (significantly) changed when I extracted it from KIP-892. My prototype currently maintains this "cache" of changelog offsets in .checkpoint, but doing so becomes very messy. My intent with this change was to try to better encapsulate this offset "caching", especially for StateStores that can cheaply provide the offsets stored directly in them without needing to duplicate them in this cache. It's clear some more work is needed here to better encapsulate this. My immediate thought is: what if we construct *but don't initialize* the StateManager and StateStores for every Task directory on-disk? That should still be quite cheap to do, and would enable us to query the offsets for all on-disk stores, even if they're not open. If the StateManager (aka. ProcessorStateManager/GlobalStateManager) proves too expensive to hold open for closed stores, we could always have a "StubStateManager" in its place, that enables the querying of offsets, but nothing else? IDK, what do you think? Regards, Nick On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna wrote: > Hi Nick, > > Thanks for breaking out the KIP from KIP-892! > > Here a couple of comments/questions: > > 1. > In Kafka Streams, we have a design guideline which says to not use the > "get"-prefix for getters on the public API. Could you please change > getCommittedOffsets() to committedOffsets()? > > > 2. > It is not clear to me how TaskManager#getTaskOffsetSums() should read > offsets of tasks the stream thread does not own but that have a state > directory on the Streams client by calling > StateStore#getCommittedOffsets(). If the thread does not own a task it > does also not create any state stores for the task, which means there is > no state store on which to call getCommittedOffsets(). > I would have rather expected that a checkpoint file is written for all > state stores on close -- not only for the RocksDBStore -- and that this > checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks > that have a state directory on the client but are not currently assigned > to any stream thread of the Streams client. > > > 3. > In the javadocs for commit() you write > > "... all writes since the last commit(Map), or since init(StateStore) > *MUST* be available to readers, even after a restart." > > This is only true for a clean close before the restart, isn't it? > If the task fails with a dirty close, Kafka Streams cannot guarantee > that the in-memory structures of the state store (e.g. memtable in the > case of RocksDB) are flushed so that the records and the committed > offsets are persisted. > > > 4. > The wrapper that provides the legacy checkpointing behavior is actually > an implementation detail. I would remove it from the KIP, but still > state that the legacy checkpointing behavior will be supported when the > state store does not manage the checkpoints. > > > 5. > Regarding the metrics, could you please add the tags, and the recording > level (DEBUG or INFO) as done in KIP-607 or KIP-444. > > > Best, > Bruno > > On 4/7/24 5:35 PM, Nick Telford wrote: > > Hi everyone, > > > > Based on some offline discussion, I've split out the "Atomic > Checkpointing" > > section from KIP-892: Transactional Semantics for StateStores, into its > own > > KIP > > > > KIP-1035: StateStore managed changelog offsets > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-1035%3A+StateStore+managed+changelog+offsets > > > > While KIP-892 was adopted *with* the changes outlined in KIP-1035, these > > changes were always the most contentious part, and continued to spur > > discussion even after KIP-892 was adopted. > > > > All the changes introduced in KIP-1035 have been removed from KIP-892, > and > > a hard dependency on KIP-1035 has been added to KIP-892 in their place. > > > > I'm hopeful that with some more focus on this set of changes, we can > > deliver something that we're all happy with. > > > > Regards, > > Nick > > >
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi Bruno, Immediately after I sent my response, I looked at the codebase and came to the same conclusion. If it's possible at all, it will need to be done by creating temporary StateManagers and StateStores during rebalance. I think it is possible, and probably not too expensive, but the devil will be in the detail. I'll try to find some time to explore the idea to see if it's possible and report back, because we'll need to determine this before we can vote on the KIP. Regards, Nick On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna wrote: > Hi Nick, > > Thanks for reacting on my comments so quickly! > > > 2. > Some thoughts on your proposal. > State managers (and state stores) are parts of tasks. If the task is not > assigned locally, we do not create those tasks. To get the offsets with > your approach, we would need to either create kind of inactive tasks > besides active and standby tasks or store and manage state managers of > non-assigned tasks differently than the state managers of assigned > tasks. Additionally, the cleanup thread that removes unassigned task > directories needs to concurrently delete those inactive tasks or > task-less state managers of unassigned tasks. This seems all quite messy > to me. > Could we create those state managers (or state stores) for locally > existing but unassigned tasks on demand when > TaskManager#getTaskOffsetSums() is executed? Or have a different > encapsulation for the unused task directories? > > > Best, > Bruno > > > > On 4/10/24 11:31 AM, Nick Telford wrote: > > Hi Bruno, > > > > Thanks for the review! > > > > 1, 4, 5. > > Done > > > > 3. > > You're right. I've removed the offending paragraph. I had originally > > adapted this from the guarantees outlined in KIP-892. But it's difficult > to > > provide these guarantees without the KIP-892 transaction buffers. > Instead, > > we'll add the guarantees back into the JavaDoc when KIP-892 lands. > > > > 2. > > Good point! This is the only part of the KIP that was (significantly) > > changed when I extracted it from KIP-892. My prototype currently > maintains > > this "cache" of changelog offsets in .checkpoint, but doing so becomes > very > > messy. My intent with this change was to try to better encapsulate this > > offset "caching", especially for StateStores that can cheaply provide the > > offsets stored directly in them without needing to duplicate them in this > > cache. > > > > It's clear some more work is needed here to better encapsulate this. My > > immediate thought is: what if we construct *but don't initialize* the > > StateManager and StateStores for every Task directory on-disk? That > should > > still be quite cheap to do, and would enable us to query the offsets for > > all on-disk stores, even if they're not open. If the StateManager (aka. > > ProcessorStateManager/GlobalStateManager) proves too expensive to hold > open > > for closed stores, we could always have a "StubStateManager" in its > place, > > that enables the querying of offsets, but nothing else? > > > > IDK, what do you think? > > > > Regards, > > > > Nick > > > > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna wrote: > > > >> Hi Nick, > >> > >> Thanks for breaking out the KIP from KIP-892! > >> > >> Here a couple of comments/questions: > >> > >> 1. > >> In Kafka Streams, we have a design guideline which says to not use the > >> "get"-prefix for getters on the public API. Could you please change > >> getCommittedOffsets() to committedOffsets()? > >> > >> > >> 2. > >> It is not clear to me how TaskManager#getTaskOffsetSums() should read > >> offsets of tasks the stream thread does not own but that have a state > >> directory on the Streams client by calling > >> StateStore#getCommittedOffsets(). If the thread does not own a task it > >> does also not create any state stores for the task, which means there is > >> no state store on which to call getCommittedOffsets(). > >> I would have rather expected that a checkpoint file is written for all > >> state stores on close -- not only for the RocksDBStore -- and that this > >> checkpoint file is read in TaskManager#getTaskOffsetSums() for the tasks > >> that have a state directory on the client but are not currently assigned > >> to any stream thread of the Streams client. > >> > >> > >> 3. > >> In the javado
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
On further thought, it's clear that this can't work for one simple reason: StateStores don't know their associated TaskId (and hence, their StateDirectory) until the init() call. Therefore, committedOffset() can't be called before init(), unless we also added a StateStoreContext argument to committedOffset(), which I think might be trying to shoehorn too much into committedOffset(). I still don't like the idea of the Streams engine maintaining the cache of changelog offsets independently of stores, mostly because of the maintenance burden of the code duplication, but it looks like we'll have to live with it. Unless you have any better ideas? Regards, Nick On Wed, 10 Apr 2024 at 14:12, Nick Telford wrote: > Hi Bruno, > > Immediately after I sent my response, I looked at the codebase and came to > the same conclusion. If it's possible at all, it will need to be done by > creating temporary StateManagers and StateStores during rebalance. I think > it is possible, and probably not too expensive, but the devil will be in > the detail. > > I'll try to find some time to explore the idea to see if it's possible and > report back, because we'll need to determine this before we can vote on the > KIP. > > Regards, > Nick > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna wrote: > >> Hi Nick, >> >> Thanks for reacting on my comments so quickly! >> >> >> 2. >> Some thoughts on your proposal. >> State managers (and state stores) are parts of tasks. If the task is not >> assigned locally, we do not create those tasks. To get the offsets with >> your approach, we would need to either create kind of inactive tasks >> besides active and standby tasks or store and manage state managers of >> non-assigned tasks differently than the state managers of assigned >> tasks. Additionally, the cleanup thread that removes unassigned task >> directories needs to concurrently delete those inactive tasks or >> task-less state managers of unassigned tasks. This seems all quite messy >> to me. >> Could we create those state managers (or state stores) for locally >> existing but unassigned tasks on demand when >> TaskManager#getTaskOffsetSums() is executed? Or have a different >> encapsulation for the unused task directories? >> >> >> Best, >> Bruno >> >> >> >> On 4/10/24 11:31 AM, Nick Telford wrote: >> > Hi Bruno, >> > >> > Thanks for the review! >> > >> > 1, 4, 5. >> > Done >> > >> > 3. >> > You're right. I've removed the offending paragraph. I had originally >> > adapted this from the guarantees outlined in KIP-892. But it's >> difficult to >> > provide these guarantees without the KIP-892 transaction buffers. >> Instead, >> > we'll add the guarantees back into the JavaDoc when KIP-892 lands. >> > >> > 2. >> > Good point! This is the only part of the KIP that was (significantly) >> > changed when I extracted it from KIP-892. My prototype currently >> maintains >> > this "cache" of changelog offsets in .checkpoint, but doing so becomes >> very >> > messy. My intent with this change was to try to better encapsulate this >> > offset "caching", especially for StateStores that can cheaply provide >> the >> > offsets stored directly in them without needing to duplicate them in >> this >> > cache. >> > >> > It's clear some more work is needed here to better encapsulate this. My >> > immediate thought is: what if we construct *but don't initialize* the >> > StateManager and StateStores for every Task directory on-disk? That >> should >> > still be quite cheap to do, and would enable us to query the offsets for >> > all on-disk stores, even if they're not open. If the StateManager (aka. >> > ProcessorStateManager/GlobalStateManager) proves too expensive to hold >> open >> > for closed stores, we could always have a "StubStateManager" in its >> place, >> > that enables the querying of offsets, but nothing else? >> > >> > IDK, what do you think? >> > >> > Regards, >> > >> > Nick >> > >> > On Tue, 9 Apr 2024 at 15:00, Bruno Cadonna wrote: >> > >> >> Hi Nick, >> >> >> >> Thanks for breaking out the KIP from KIP-892! >> >> >> >> Here a couple of comments/questions: >> >> >> >> 1. >> >> In Kafka Streams, we have a design guid
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien, Thanks for the KIP! Dead-letter queues are something that I think a lot of users would like. I think there are a few points with this KIP that concern me: 1. It looks like you can only define a single, global DLQ for the entire Kafka Streams application? What about applications that would like to define different DLQs for different data flows? This is especially important when dealing with multiple source topics that have different record schemas. 2. Your DLQ payload value can either be the record value that failed, or an error string (such as "error during punctuate"). This is likely to cause problems when users try to process the records from the DLQ, as they can't guarantee the format of every record value will be the same. This is very loosely related to point 1. above. 3. You provide a ProcessorContext to both exception handlers, but state they cannot be used to forward records. In that case, I believe you should use ProcessingContext instead, which statically guarantees that it can't be used to forward records. 4. You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? Regards, Nick On Fri, 12 Apr 2024 at 11:38, Damien Gasparina wrote: > In a general way, if the user does not configure the right ACL, that > would be a security issue, but that's true for any topic. > > This KIP allows users to configure a Dead Letter Queue without writing > custom Java code in Kafka Streams, not at the topic level. > A lot of applications are already implementing this pattern, but the > required code to do it is quite painful and error prone, for example > most apps I have seen created a new KafkaProducer to send records to > their DLQ. > > As it would be disabled by default for backward compatibility, I doubt > it would generate any security concern. > If a user explicitly configures a Deal Letter Queue, it would be up to > him to configure the relevant ACLs to ensure that the right principal > can access it. > It is already the case for all internal, input and output Kafka > Streams topics (e.g. repartition, changelog topics) that also could > contain confidential data, so I do not think we should implement a > different behavior for this one. > > In this KIP, we configured the default DLQ record to have the initial > record key/value as we assume that it is the expected and wanted > behavior for most applications. > If a user does not want to have the key/value in the DLQ record for > any reason, they could still implement exception handlers to build > their own DLQ record. > > Regarding ACL, maybe something smarter could be done in Kafka Streams, > but this is out of scope for this KIP. > > On Fri, 12 Apr 2024 at 11:58, Claude Warren wrote: > > > > My concern is that someone would create a dead letter queue on a > sensitive > > topic and not get the ACL correct from the start. Thus causing potential > > confidential data leak. Is there anything in the proposal that would > > prevent that from happening? If so I did not recognize it as such. > > > > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > > wrote: > > > > > Hi Claude, > > > > > > In this KIP, the Dead Letter Queue is materialized by a standard and > > > independant topic, thus normal ACL applies to it like any other topic. > > > This should not introduce any security issues, obviously, the right > > > ACL would need to be provided to write to the DLQ if configured. > > > > > > Cheers, > > > Damien > > > > > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr > > > wrote: > > > > > > > > I am new to the Kafka codebase so please excuse any ignorance on my > part. > > > > > > > > When a dead letter queue is established is there a process to ensure > that > > > > it at least is defined with the same ACL as the original queue? > Without > > > > such a guarantee at the start it seems that managing dead letter > queues > > > > will be fraught with security issues. > > > > > > > > > > > > On Wed, Apr 10, 2024 at 10:34 AM Damien Gasparina < > d.gaspar...@gmail.com > > > > > > > > wrote: > > > > > > > > > Hi everyone, > > > > > > > > > > To continue on our effort to improve Kafka Streams error handling, > we > > > > > propose a new KIP to add out of the box support for Dead Letter > Queue. > > > > > The goal of this KIP is to provide a default implementation that > > > > > should be suitable for m
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Oh, and one more thing: 5. Whenever you take a record out of the stream, and then potentially re-introduce it at a later date, you introduce the potential for record ordering issues. For example, that record could have been destined for a Window that has been closed by the time it's re-processed. I'd like to see a section that considers these consequences, and perhaps make those risks clear to users. For the record, this is exactly what sunk KIP-990, which was an alternative approach to error handling that introduced the same issues. Cheers, Nick On Fri, 12 Apr 2024 at 11:54, Nick Telford wrote: > Hi Damien, > > Thanks for the KIP! Dead-letter queues are something that I think a lot of > users would like. > > I think there are a few points with this KIP that concern me: > > 1. > It looks like you can only define a single, global DLQ for the entire > Kafka Streams application? What about applications that would like to > define different DLQs for different data flows? This is especially > important when dealing with multiple source topics that have different > record schemas. > > 2. > Your DLQ payload value can either be the record value that failed, or an > error string (such as "error during punctuate"). This is likely to cause > problems when users try to process the records from the DLQ, as they can't > guarantee the format of every record value will be the same. This is very > loosely related to point 1. above. > > 3. > You provide a ProcessorContext to both exception handlers, but state they > cannot be used to forward records. In that case, I believe you should use > ProcessingContext instead, which statically guarantees that it can't be > used to forward records. > > 4. > You mention the KIP-1033 ProcessingExceptionHandler, but what's the plan > if KIP-1033 is not adopted, or if KIP-1034 lands before 1033? > > Regards, > > Nick > > On Fri, 12 Apr 2024 at 11:38, Damien Gasparina > wrote: > >> In a general way, if the user does not configure the right ACL, that >> would be a security issue, but that's true for any topic. >> >> This KIP allows users to configure a Dead Letter Queue without writing >> custom Java code in Kafka Streams, not at the topic level. >> A lot of applications are already implementing this pattern, but the >> required code to do it is quite painful and error prone, for example >> most apps I have seen created a new KafkaProducer to send records to >> their DLQ. >> >> As it would be disabled by default for backward compatibility, I doubt >> it would generate any security concern. >> If a user explicitly configures a Deal Letter Queue, it would be up to >> him to configure the relevant ACLs to ensure that the right principal >> can access it. >> It is already the case for all internal, input and output Kafka >> Streams topics (e.g. repartition, changelog topics) that also could >> contain confidential data, so I do not think we should implement a >> different behavior for this one. >> >> In this KIP, we configured the default DLQ record to have the initial >> record key/value as we assume that it is the expected and wanted >> behavior for most applications. >> If a user does not want to have the key/value in the DLQ record for >> any reason, they could still implement exception handlers to build >> their own DLQ record. >> >> Regarding ACL, maybe something smarter could be done in Kafka Streams, >> but this is out of scope for this KIP. >> >> On Fri, 12 Apr 2024 at 11:58, Claude Warren wrote: >> > >> > My concern is that someone would create a dead letter queue on a >> sensitive >> > topic and not get the ACL correct from the start. Thus causing >> potential >> > confidential data leak. Is there anything in the proposal that would >> > prevent that from happening? If so I did not recognize it as such. >> > >> > On Fri, Apr 12, 2024 at 9:45 AM Damien Gasparina > > >> > wrote: >> > >> > > Hi Claude, >> > > >> > > In this KIP, the Dead Letter Queue is materialized by a standard and >> > > independant topic, thus normal ACL applies to it like any other topic. >> > > This should not introduce any security issues, obviously, the right >> > > ACL would need to be provided to write to the DLQ if configured. >> > > >> > > Cheers, >> > > Damien >> > > >> > > On Fri, 12 Apr 2024 at 08:59, Claude Warren, Jr >> > > wrote: >> > > > >> > > > I am new to the Kafka codebase
Re: [DISCUSS] KIP-1034: Dead letter queue in Kafka Streams
Hi Damien and Sebastien, 1. I think you can just add a `String topic` argument to the existing `withDeadLetterQueueRecord(ProducerRecord deadLetterQueueRecord)` method, and then the implementation of the exception handler could choose the topic to send records to using whatever logic the user desires. You could perhaps provide a built-in implementation that leverages your new config to send all records to an untyped DLQ topic? 1a. BTW you have a typo: in your DeserializationExceptionHandler, the type of your `deadLetterQueueRecord` argument is `ProducerRecord`, when it should probably be `ConsumerRecord`. 2. Agreed. I think it's a good idea to provide an implementation that sends to a single DLQ by default, but it's important to enable users to customize this with their own exception handlers. 2a. I'm not convinced that "errors" (e.g. failed punctuate) should be sent to a DLQ topic like it's a bad record. To me, a DLQ should only contain records that failed to process. I'm not even sure how a user would re-process/action one of these other errors; it seems like the purview of error logging to me? 4. My point here was that I think it would be useful for the KIP to contain an explanation of the behavior both with KIP-1033 and without it. i.e. clarify if/how records that throw an exception in a processor are handled. At the moment, I'm assuming that without KIP-1033, processing exceptions would not cause records to be sent to the DLQ, but with KIP-1033, they would. If this assumption is correct, I think it should be made explicit in the KIP. 5. Understood. You may want to make this explicit in the documentation for users, so they understand the consequences of re-processing data sent to their DLQ. The main reason I raised this point is it's something that's tripped me up in numerous KIPs that that committers frequently remind me of; so I wanted to get ahead of it for once! :D And one new point: 6. The DLQ record schema appears to discard all custom headers set on the source record. Is there a way these can be included? In particular, I'm concerned with "schema pointer" headers (like those set by Schema Registry), that may need to be propagated, especially if the records are fed back into the source topics for re-processing by the user. Regards, Nick On Fri, 12 Apr 2024 at 13:20, Damien Gasparina wrote: > Hi Nick, > > Thanks a lot for your review and your useful comments! > > 1. It is a good point, as you mentioned, I think it would make sense > in some use cases to have potentially multiple DLQ topics, so we > should provide an API to let users do it. > Thinking out-loud here, maybe it is a better approach to create a new > Record class containing the topic name, e.g. DeadLetterQueueRecord and > changing the signature to > withDeadLetterQueueRecords(Iteratable > deadLetterQueueRecords) instead of > withDeadLetterQueueRecord(ProducerRecord > deadLetterQueueRecord). What do you think? DeadLetterQueueRecord would > be something like "class DeadLetterQueueRecord extends > org.apache.kafka.streams.processor.api;.ProducerRecords { String > topic; /* + getter/setter + */ } " > > 2. I think the root question here is: should we have one DLQ topic or > multiple DLQ topics by default. This question highly depends on the > context, but implementing a default implementation to handle multiple > DLQ topics would be opinionated, e.g. how to manage errors in a > punctuate? > I think it makes sense to have the default implementation writing all > faulty records to a single DLQ, that's at least the approach I used in > past applications: one DLQ per Kafka Streams application. Of course > the message format could change in the DLQ e.g. due to the source > topic, but those DLQ records will be very likely troubleshooted, and > maybe replay, manually anyway. > If a user needs to have multiple DLQ topics or want to enforce a > specific schema, it's still possible, but they would need to implement > custom Exception Handlers. > Coming back to 1. I do agree that it would make sense to have the user > set the DLQ topic name in the handlers for more flexibility. > > 3. Good point, sorry it was a typo, the ProcessingContext makes much > more sense here indeed. > > 4. I do assume that we could implement KIP-1033 (Processing exception > handler) independently from KIP-1034. I do hope that KIP-1033 would be > adopted and implemented before KIP-1034, but if that's not the case, > we could implement KIP-1034 indepantly and update KIP-1033 to include > the DLQ record afterward (in the same KIP or in a new one if not > possible). > > 5. I think we should be clear that this KIP only covers the DLQ record > produced. > Everything related to replay messages or recovery plan should be > considered
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
Hi Sophie, Interesting idea! Although what would that mean for the StateStore interface? Obviously we can't require that the constructor take the TaskId. Is it enough to add the parameter to the StoreSupplier? Would doing this be in-scope for this KIP, or are we over-complicating it? Nick On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman wrote: > Somewhat minor point overall, but it actually drives me crazy that you > can't get access to the taskId of a StateStore until #init is called. This > has caused me a huge headache personally (since the same is true for > processors and I was trying to do something that's probably too hacky to > actually complain about here lol) > > Can we just change the StateStoreSupplier to receive and pass along the > taskId when creating a new store? Presumably by adding a new version of the > #get method that takes in a taskId parameter? We can have it default to > invoking the old one for compatibility reasons and it should be completely > safe to tack on. > > Would also prefer the same for a ProcessorSupplier, but that's definitely > outside the scope of this KIP > > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford > wrote: > > > On further thought, it's clear that this can't work for one simple > reason: > > StateStores don't know their associated TaskId (and hence, their > > StateDirectory) until the init() call. Therefore, committedOffset() can't > > be called before init(), unless we also added a StateStoreContext > argument > > to committedOffset(), which I think might be trying to shoehorn too much > > into committedOffset(). > > > > I still don't like the idea of the Streams engine maintaining the cache > of > > changelog offsets independently of stores, mostly because of the > > maintenance burden of the code duplication, but it looks like we'll have > to > > live with it. > > > > Unless you have any better ideas? > > > > Regards, > > Nick > > > > On Wed, 10 Apr 2024 at 14:12, Nick Telford > wrote: > > > > > Hi Bruno, > > > > > > Immediately after I sent my response, I looked at the codebase and came > > to > > > the same conclusion. If it's possible at all, it will need to be done > by > > > creating temporary StateManagers and StateStores during rebalance. I > > think > > > it is possible, and probably not too expensive, but the devil will be > in > > > the detail. > > > > > > I'll try to find some time to explore the idea to see if it's possible > > and > > > report back, because we'll need to determine this before we can vote on > > the > > > KIP. > > > > > > Regards, > > > Nick > > > > > > On Wed, 10 Apr 2024 at 11:36, Bruno Cadonna > wrote: > > > > > >> Hi Nick, > > >> > > >> Thanks for reacting on my comments so quickly! > > >> > > >> > > >> 2. > > >> Some thoughts on your proposal. > > >> State managers (and state stores) are parts of tasks. If the task is > not > > >> assigned locally, we do not create those tasks. To get the offsets > with > > >> your approach, we would need to either create kind of inactive tasks > > >> besides active and standby tasks or store and manage state managers of > > >> non-assigned tasks differently than the state managers of assigned > > >> tasks. Additionally, the cleanup thread that removes unassigned task > > >> directories needs to concurrently delete those inactive tasks or > > >> task-less state managers of unassigned tasks. This seems all quite > messy > > >> to me. > > >> Could we create those state managers (or state stores) for locally > > >> existing but unassigned tasks on demand when > > >> TaskManager#getTaskOffsetSums() is executed? Or have a different > > >> encapsulation for the unused task directories? > > >> > > >> > > >> Best, > > >> Bruno > > >> > > >> > > >> > > >> On 4/10/24 11:31 AM, Nick Telford wrote: > > >> > Hi Bruno, > > >> > > > >> > Thanks for the review! > > >> > > > >> > 1, 4, 5. > > >> > Done > > >> > > > >> > 3. > > >> > You're right. I've removed the offending paragraph. I had originally > > >> > adapted this from the guarantees outlined in KIP-8
Re: [DISCUSS] KIP-1035: StateStore managed changelog offsets
That does make sense. The one thing I can't figure out is how per-Task StateStore instances are constructed. It looks like we construct one StateStore instance for the whole Topology (in InternalTopologyBuilder), and pass that into ProcessorStateManager (via StateManagerUtil) for each Task, which then initializes it. This can't be the case though, otherwise multiple partitions of the same sub-topology (aka Tasks) would share the same StateStore instance, which they don't. What am I missing? On Tue, 16 Apr 2024 at 16:22, Sophie Blee-Goldman wrote: > I don't think we need to *require* a constructor accept the TaskId, but we > would definitely make sure that the RocksDB state store changes its > constructor to one that accepts the TaskID (which we can do without > deprecation since its an internal API), and custom state stores can just > decide for themselves whether they want to opt-in/use the TaskId param > or not. I mean custom state stores would have to opt-in anyways by > implementing the new StoreSupplier#get(TaskId) API and the only > reason to do that would be to have created a constructor that accepts > a TaskId > > Just to be super clear about the proposal, this is what I had in mind. > It's actually fairly simple and wouldn't add much to the scope of the > KIP (I think -- if it turns out to be more complicated than I'm assuming, > we should definitely do whatever has the smallest LOE to get this done > > Anyways, the (only) public API changes would be to add this new > method to the StoreSupplier API: > > default T get(final TaskId taskId) { > return get(); > } > > We can decide whether or not to deprecate the old #get but it's not > really necessary and might cause a lot of turmoil, so I'd personally > say we just leave both APIs in place. > > And that's it for public API changes! Internally, we would just adapt > each of the rocksdb StoreSupplier classes to implement this new > API. So for example with the RocksDBKeyValueBytesStoreSupplier, > we just add > > @Override > public KeyValueStore get(final TaskId taskId) { > return returnTimestampedStore ? > new RocksDBTimestampedStore(name, metricsScope(), taskId) : > new RocksDBStore(name, metricsScope(), taskId); > } > > And of course add the TaskId parameter to each of the actual > state store constructors returned here. > > Does that make sense? It's entirely possible I'm missing something > important here, but I think this would be a pretty small addition that > would solve the problem you mentioned earlier while also being > useful to anyone who uses custom state stores. > > On Mon, Apr 15, 2024 at 10:21 AM Nick Telford > wrote: > > > Hi Sophie, > > > > Interesting idea! Although what would that mean for the StateStore > > interface? Obviously we can't require that the constructor take the > TaskId. > > Is it enough to add the parameter to the StoreSupplier? > > > > Would doing this be in-scope for this KIP, or are we over-complicating > it? > > > > Nick > > > > On Fri, 12 Apr 2024 at 21:30, Sophie Blee-Goldman > > > wrote: > > > > > Somewhat minor point overall, but it actually drives me crazy that you > > > can't get access to the taskId of a StateStore until #init is called. > > This > > > has caused me a huge headache personally (since the same is true for > > > processors and I was trying to do something that's probably too hacky > to > > > actually complain about here lol) > > > > > > Can we just change the StateStoreSupplier to receive and pass along the > > > taskId when creating a new store? Presumably by adding a new version of > > the > > > #get method that takes in a taskId parameter? We can have it default to > > > invoking the old one for compatibility reasons and it should be > > completely > > > safe to tack on. > > > > > > Would also prefer the same for a ProcessorSupplier, but that's > definitely > > > outside the scope of this KIP > > > > > > On Fri, Apr 12, 2024 at 3:31 AM Nick Telford > > > wrote: > > > > > > > On further thought, it's clear that this can't work for one simple > > > reason: > > > > StateStores don't know their associated TaskId (and hence, their > > > > StateDirectory) until the init() call. Therefore, committedOffset() > > can't > > > > be called before init(), unless we also added a StateStoreContext > > > argument > > > > to committedOffset(), which I think might be trying to
Re: [DISCUSS] KIP-892: Transactional Semantics for StateStores
Hi Walker, Feel free to ask away, either on the mailing list of the Confluent Community Slack, where I hang out :-) The implementation is *mostly* complete, although it needs some polishing. It's worth noting that KIP-1035 is a hard prerequisite for this. Regards, Nick
Re: [DISCUSS] KIP-1071: Streams Rebalance Protocol
Hi Lucas, NT4. Sounds good, although should it take the maximum offsets? Wouldn't it be more correct to take the *most recent* offsets? (i.e. the offsets from the more recently received heartbeat) My thinking is that it might be possible (albeit exceptionally rare) for the on-disk offsets to revert to a previous number, and taking the max would incorrectly assume the older offsets are correct. Regards, Nick On Mon, 19 Aug 2024 at 15:00, Lucas Brutschy wrote: > Hi Nick, > > NT4: As discussed, we will still require locking in the new protocol > to avoid concurrent read/write access on the checkpoint file, at least > as long as KIP-1035 hasn't landed. However, as you correctly pointed > out, the assignor will have to accept offsets for overlapping sets of > dormant tasks. I updated the KIP to make this explicit. If the > corresponding offset information for one task conflicts between > clients (which can happen), the conflict is resolved by taking the > maximum of the offsets. > > Cheers, > Lucas > > On Fri, Aug 16, 2024 at 7:14 PM Guozhang Wang > wrote: > > > > Hello Lucas, > > > > Thanks for the great KIP. I've read it through and it looks good to > > me. As we've discussed, much of my thoughts would be outside the scope > > of this very well scoped and defined KIP, so I will omit them for now. > > > > The only one I had related to this KIP is about topology updating. I > > understand the motivation of the proposal is that basically since each > > time group forming a (new) generation may potentially accept not all > > of the members joining because of the timing of the RPCs, the group's > > topology ID may be not reflecting the "actual" most recent topologies > > if some zombie members holding an old topology form a group generation > > quickly enough, which would effectively mean that zombie members > > actually blocking other real members from getting tasks assigned. On > > the other hand, like you've mentioned already in the doc, requesting > > some sort of ID ordering by pushing the burden on the user's side > > would also be too much for users, increasing the risk of human errors > > in operations. > > > > I'm wondering if instead of trying to be smart programmingly, we just > > let the protocol to act dumbly (details below). The main reasons I had > > in mind are: > > > > 1) Upon topology changes, some tasks may no longer exist in the new > > topology, so still letting them execute on the clients which do not > > yet have the new topology would waste resources. > > > > 2) As we discussed, trying to act smart introduces more complexities > > in the coordinator that tries to balance different assignment goals > > between stickiness, balance, and now topology mis-matches between > > clients. > > > > 3) Scenarios that mismatching topologies be observed within a group > generation: > >a. Zombie / old clients that do not have the new topology, and will > > never have. > >b. During a rolling bounce upgrade, where not-yet-bounced clients > > would not yet have the new topology. > >c. Let's assume we would not ever have scenarios where users want > > to intentionally have a subset of clients within a group running a > > partial / subset of the full sub-topologies, since such cases can well > > be covered by a custom assignor that takes into those considerations > > by never assigning some tasks to some clients etc. That means, the > > only scenarios we would need to consider are a) and b). > > > > For b), I think it's actually okay to temporarily block the progress > > of the group until everyone is bounced with the updated topology; as > > for a), originally I thought having one or a few clients blocking the > > whole group would be a big problem, but now that I think more, I felt > > from the operations point of view, just letting the app being blocked > > with a informational log entry to quickly ping-down the zombie clients > > may actually be acceptable. All in all, it makes the code simpler > > programmingly by not trying to abstract away issue scenario a) from > > the users (or operators) but letting them know asap. > > > > -- > > > > Other than that, everything else looks good to me. > > > > > > Guozhang > > > > > > On Fri, Aug 16, 2024 at 7:38 AM Nick Telford > wrote: > > > > > > Hi Lucas, > > > > > > NT4. > > > Given that the new assignment procedure guarantees that a Task has been > > > closed before it is assigned to a different
Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams
Hi Sophie, The reason I chose to add a new overload of "to", instead of creating a new method, is simply because I felt that "to" was about sending records "to" somewhere, and that "somewhere" just happens to currently be exclusively topics. By re-using "to", we can send records *to other KStreams*, including a KStream from an earlier point in the current KStreams' pipeline, which would facilitate recursion. Sending records to a completely different KStream would be essentially a merge. However, I'm happy to reduce the scope of this method to focus exclusively on recursion: we'd simply need to add a check in to the method that ensures the target is an ancestor node of the current KStream node. Which brings me to your first query... My argument is simply that a 0-ary method isn't enough to facilitate recursive streaming, because you need to be able to communicate which point in the process graph you want to feed your records back in to. Consider my example from the KIP, but re-written with a 0-ary "recursively" method: updates .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively() Where does the join output get fed to? 1. The "updates" (source) node? 2. The "join" node itself? It would probably be most intuitive if it simply caused the last step to be recursive, but that won't always be what you want. Consider if we add some more steps in to the above: updates .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make sense in this algorithm, but let's pretend it does .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively() If "recursively" just feeds records back into the "join", it misses out on potentially important steps in our recursive algorithm. It also gets even worse if the step you're making recursive doesn't contain your terminal condition: foo .filter((key, value) -> value <= 0) // <-- terminal condition .mapValues((value) -> value - 1) .recursively() If "recursively" feeds records back to the "mapValues" stage in our pipeline, and not in to "filter" or "foo", then the terminal condition in "filter" won't be evaluated for any values with a starting value greater than 0, *causing an infinite loop*. There's an argument to be had to always feed the values back to the first ancestor "source node", in the process-graph, but that might not be particularly intuitive, and is likely going to limit some of the recursive algorithms that some may want to implement. For example, in the previous example, there's no guarantee that "foo" is a source node; it could be the result of a "mapValues", for example. Ultimately, the solution here is to make this method take a parameter, explicitly specifying the KStream that records are fed back in to, making the above two examples: updates .map((parent, count) -> KeyValue(parent, count + 1)) .join(parents, (count, parent) -> { KeyValue(parent, count) }) .recursively(updates) and: foo .filter((key, value) -> value <= 0) .mapValues((value) -> value - 1) .recursively(foo) We could *also* support a 0-ary version of the method that defaults to recursively executing the previous node, but I'm worried that users may not fully understand the consequences of this, inadvertently creating infinite loops that are difficult to debug. Finally, I'm not convinced that "recursively" is the best name for the method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome! If we want to prevent this method being "abused" to merge different streams together, it should be trivial to ensure that the provided argument is an ancestor of the current node, by recursively traversing up the process graph. I hope this clarifies your questions. It's clear that the KIP needs more work to better elaborate on these points. I haven't had a chance to revise it yet, due to more pressing issues with EOS stability that I've been looking into. Regards, Nick On Tue, 23 Aug 2022 at 23:50, Sophie Blee-Goldman wrote: > Hey Nick, > > Sounds like an interesting KIP, and I agree the current way of achieving > this in Streams > seems wildly overcomplicated. So I'm definitely +1 on adding a smooth API > that abstracts > away a lot of the complexity and unnecessary topic management. > > That said, I've found much of the discussion so far on the API itself to be > very confusing -- for example, I don't understand this point: > > I actually considered a "recursion" API, something > > like you suggested, however it won't work, because to do the recursion > you
Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams
Hi everyone, I've re-written the KIP, with a new design that I think resolves the issues you highlighted, and also simplifies usage. https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams Note: I'm still working out the "automatic repartitioning" in my head, as I don't think it's quite right. It may turn out that the additional overload (with the Produced argument) is not necessary. Thanks for all your feedback so far. Let me know what you think! Regards, Nick On Thu, 25 Aug 2022 at 17:46, Nick Telford wrote: > Hi Sophie, > > The reason I chose to add a new overload of "to", instead of creating a > new method, is simply because I felt that "to" was about sending records > "to" somewhere, and that "somewhere" just happens to currently be > exclusively topics. By re-using "to", we can send records *to other > KStreams*, including a KStream from an earlier point in the current > KStreams' pipeline, which would facilitate recursion. Sending records to a > completely different KStream would be essentially a merge. > > However, I'm happy to reduce the scope of this method to focus exclusively > on recursion: we'd simply need to add a check in to the method that ensures > the target is an ancestor node of the current KStream node. > > Which brings me to your first query... > > My argument is simply that a 0-ary method isn't enough to facilitate > recursive streaming, because you need to be able to communicate which point > in the process graph you want to feed your records back in to. > > Consider my example from the KIP, but re-written with a 0-ary > "recursively" method: > > updates > .join(parents, (count, parent) -> { KeyValue(parent, count) }) > .recursively() > > Where does the join output get fed to? > >1. The "updates" (source) node? >2. The "join" node itself? > > It would probably be most intuitive if it simply caused the last step to > be recursive, but that won't always be what you want. Consider if we add > some more steps in to the above: > > updates > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't make > sense in this algorithm, but let's pretend it does > .join(parents, (count, parent) -> { KeyValue(parent, count) }) > .recursively() > > If "recursively" just feeds records back into the "join", it misses out on > potentially important steps in our recursive algorithm. It also gets even > worse if the step you're making recursive doesn't contain your terminal > condition: > > foo > .filter((key, value) -> value <= 0) // <-- terminal condition > .mapValues((value) -> value - 1) > .recursively() > > If "recursively" feeds records back to the "mapValues" stage in our > pipeline, and not in to "filter" or "foo", then the terminal condition in > "filter" won't be evaluated for any values with a starting value greater > than 0, *causing an infinite loop*. > > There's an argument to be had to always feed the values back to the first > ancestor "source node", in the process-graph, but that might not be > particularly intuitive, and is likely going to limit some of the recursive > algorithms that some may want to implement. For example, in the previous > example, there's no guarantee that "foo" is a source node; it could be the > result of a "mapValues", for example. > > Ultimately, the solution here is to make this method take a parameter, > explicitly specifying the KStream that records are fed back in to, making > the above two examples: > > updates > .map((parent, count) -> KeyValue(parent, count + 1)) > .join(parents, (count, parent) -> { KeyValue(parent, count) }) > .recursively(updates) > > and: > > foo > .filter((key, value) -> value <= 0) > .mapValues((value) -> value - 1) > .recursively(foo) > > We could *also* support a 0-ary version of the method that defaults to > recursively executing the previous node, but I'm worried that users may not > fully understand the consequences of this, inadvertently creating infinite > loops that are difficult to debug. > > Finally, I'm not convinced that "recursively" is the best name for the > method. Perhaps "recursivelyVia" or "recursivelyTo"? Ideas welcome! > > If we want to prevent this method being "abused" to merge different > streams together, it should be trivial to ensure that the provided argument > is
Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams
Hi Guozhang, I mentioned this in the "Rejected Alternatives" section. Repartitioning gives us several significant advantages over using an explicit topic and "to": - Repartition topics are automatically created and managed by the Streams runtime; explicit topics have to be created and managed by the user. - Repartitioning topics have no retention criteria and instead purge records once consumed, this prevents data loss. Explicit topics need retention criteria, which have to be set large enough to avoid data loss, often wasting considerable resources. - The "recursively" method requires significantly less code than recursion via an explicit topic, and is significantly easier to understand. Ultimately, I don't think repartitioning inside the unary operator adds much complexity to the implementation. Certainly no more than other DSL operations. Regards, Nick On Tue, 6 Sept 2022 at 17:28, Guozhang Wang wrote: > Hello Nick, > > Thanks for the re-written KIP! I read through it again, and so far have > just one quick question on top of my head regarding repartitioning: it > seems to me that when there's an intermediate topic inside the recursion > step, then using this new API would basically give us the same behavior as > using the existing `to` APIs. Of course, with the new API the user can make > it more explicit that it is supposed to be recursive, but efficiency wise > it provides no further optimizations. Is my understanding correct? If yes, > I'm wondering if it's worthy the complexity to allow repartitioning inside > the unary operator, or should we just restrict the recursion inside a > single sub-topology. > > > Guozhang > > On Tue, Sep 6, 2022 at 9:05 AM Nick Telford > wrote: > > > Hi everyone, > > > > I've re-written the KIP, with a new design that I think resolves the > issues > > you highlighted, and also simplifies usage. > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams > > > > Note: I'm still working out the "automatic repartitioning" in my head, > as I > > don't think it's quite right. It may turn out that the additional > overload > > (with the Produced argument) is not necessary. > > > > Thanks for all your feedback so far. Let me know what you think! > > > > Regards, > > > > Nick > > > > On Thu, 25 Aug 2022 at 17:46, Nick Telford > wrote: > > > > > Hi Sophie, > > > > > > The reason I chose to add a new overload of "to", instead of creating a > > > new method, is simply because I felt that "to" was about sending > records > > > "to" somewhere, and that "somewhere" just happens to currently be > > > exclusively topics. By re-using "to", we can send records *to other > > > KStreams*, including a KStream from an earlier point in the current > > > KStreams' pipeline, which would facilitate recursion. Sending records > to > > a > > > completely different KStream would be essentially a merge. > > > > > > However, I'm happy to reduce the scope of this method to focus > > exclusively > > > on recursion: we'd simply need to add a check in to the method that > > ensures > > > the target is an ancestor node of the current KStream node. > > > > > > Which brings me to your first query... > > > > > > My argument is simply that a 0-ary method isn't enough to facilitate > > > recursive streaming, because you need to be able to communicate which > > point > > > in the process graph you want to feed your records back in to. > > > > > > Consider my example from the KIP, but re-written with a 0-ary > > > "recursively" method: > > > > > > updates > > > .join(parents, (count, parent) -> { KeyValue(parent, count) }) > > > .recursively() > > > > > > Where does the join output get fed to? > > > > > >1. The "updates" (source) node? > > >2. The "join" node itself? > > > > > > It would probably be most intuitive if it simply caused the last step > to > > > be recursive, but that won't always be what you want. Consider if we > add > > > some more steps in to the above: > > > > > > updates > > > .map((parent, count) -> KeyValue(parent, count + 1)) // doesn't > make > > > sense in this algorithm, but let's pretend it does > > >
Re: [DISCUSS] KIP-857: Streaming recursion in Kafka Streams
The more I think about this, the more I think that automatic repartitioning is not required in the "recursively" method itself. I've removed references to this from the KIP, which further simplifies everything. I don't see any need to restrict users from repartitioning, either before, after or inside the "recursively" method. I can't see a scenario where the recursion would cause problems with it. Nick On Tue, 6 Sept 2022 at 18:08, Nick Telford wrote: > Hi Guozhang, > > I mentioned this in the "Rejected Alternatives" section. Repartitioning > gives us several significant advantages over using an explicit topic and > "to": > >- Repartition topics are automatically created and managed by the >Streams runtime; explicit topics have to be created and managed by the > user. >- Repartitioning topics have no retention criteria and instead purge >records once consumed, this prevents data loss. Explicit topics need >retention criteria, which have to be set large enough to avoid data loss, >often wasting considerable resources. >- The "recursively" method requires significantly less code than >recursion via an explicit topic, and is significantly easier to understand. > > Ultimately, I don't think repartitioning inside the unary operator adds > much complexity to the implementation. Certainly no more than other DSL > operations. > > Regards, > Nick > > On Tue, 6 Sept 2022 at 17:28, Guozhang Wang wrote: > >> Hello Nick, >> >> Thanks for the re-written KIP! I read through it again, and so far have >> just one quick question on top of my head regarding repartitioning: it >> seems to me that when there's an intermediate topic inside the recursion >> step, then using this new API would basically give us the same behavior as >> using the existing `to` APIs. Of course, with the new API the user can >> make >> it more explicit that it is supposed to be recursive, but efficiency wise >> it provides no further optimizations. Is my understanding correct? If yes, >> I'm wondering if it's worthy the complexity to allow repartitioning inside >> the unary operator, or should we just restrict the recursion inside a >> single sub-topology. >> >> >> Guozhang >> >> On Tue, Sep 6, 2022 at 9:05 AM Nick Telford >> wrote: >> >> > Hi everyone, >> > >> > I've re-written the KIP, with a new design that I think resolves the >> issues >> > you highlighted, and also simplifies usage. >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-857%3A+Streaming+recursion+in+Kafka+Streams >> > >> > Note: I'm still working out the "automatic repartitioning" in my head, >> as I >> > don't think it's quite right. It may turn out that the additional >> overload >> > (with the Produced argument) is not necessary. >> > >> > Thanks for all your feedback so far. Let me know what you think! >> > >> > Regards, >> > >> > Nick >> > >> > On Thu, 25 Aug 2022 at 17:46, Nick Telford >> wrote: >> > >> > > Hi Sophie, >> > > >> > > The reason I chose to add a new overload of "to", instead of creating >> a >> > > new method, is simply because I felt that "to" was about sending >> records >> > > "to" somewhere, and that "somewhere" just happens to currently be >> > > exclusively topics. By re-using "to", we can send records *to other >> > > KStreams*, including a KStream from an earlier point in the current >> > > KStreams' pipeline, which would facilitate recursion. Sending records >> to >> > a >> > > completely different KStream would be essentially a merge. >> > > >> > > However, I'm happy to reduce the scope of this method to focus >> > exclusively >> > > on recursion: we'd simply need to add a check in to the method that >> > ensures >> > > the target is an ancestor node of the current KStream node. >> > > >> > > Which brings me to your first query... >> > > >> > > My argument is simply that a 0-ary method isn't enough to facilitate >> > > recursive streaming, because you need to be able to communicate which >> > point >> > > in the process graph you want to feed your records back in to. >> > > >> > > Consider my example from the
Re: [DISCUSS] KIP-869: Improve Streams State Restoration Visibility
Hi Guozhang, KIP looks great, I have one suggestion: in addition to "restore-records-total", it would also be useful to track the number of records *remaining*, that is, the records that have not yet been restored. This is actually the metric I was attempting to implement in the StateRestoreListener that bumped me up against KAFKA-10575 :-) With both a "restore-records-total" and a "restore-remaining-total" (or similar) metric, it's possible to derive useful information like the estimated time remaining for restoration (by dividing the remaining total by the restoration rate). Regards, Nick On Mon, 19 Sept 2022 at 19:57, Guozhang Wang wrote: > Hello Bruno, > > Thanks for your comments! > > 1. Regarding the metrics group name: originally I put > "stream-state-metrics" as it's related to state store restorations, but > after a second thought I think I agree with you that this is quite > confusing and not right. About the metrics groups, right now I have two > ideas: > > a) Still use the metric group name "stream-thread-metrics", but > differentiate with the processing threads on the thread id. The pros is > that we do not introduce a new group, the cons is that users who want to > separate processing from restoration/updating in the future needs to do > that on the thread id labels. > b) Introduce a new group name, for example "stream-state-updater-metrics" > and still have a thread-id label. We would be introducing a new group which > still have a thread-id, which may sound a bit weird (maybe if we do that we > could change the existing "stream-thread-metrics" into > "stream-processor-metrics"). > > Right now I'm leaning towards b) and maybe in the future rename > "thread-metrics" to "processor-metrics", LMK what do you think. > > 2. Regarding the metric names: today we may also pause a standby tasks, and > hence I'm trying to differentiate updating standbys from paused standbys. > Right now I'm thinking we can change "restoring-standby-tasks" to > "updating-standby-tasks", and change all other metrics' "restore" (except > the "restoring-active-tasks") to "state-update", a.k.a > "state-update-ratio", "state-update-records-total", > "updating-standby-tasks" etc. > > 3. Regarding the function name: yeah I think that's a valid concern, I can > change it to "onRestoreSuspended" since "Aborted" may confuse people that > previously called "onBatchRestored" are undone as part of the abortion. > > > Guozhang > > > > On Mon, Sep 19, 2022 at 10:47 AM Bruno Cadonna wrote: > > > Hi Guozhang, > > > > Thanks for the KIP! I think this KIP is a really nice addition to better > > understand what is going on in a Kafka Streams application. > > > > 1. > > The metric names "paused-active-tasks" and "paused-standby-tasks" might > > be a bit confusing since at least active tasks can be paused also > > outside of restoration. > > > > 2. > > Why is the type of the metrics "stream-state-metrics"? I would have > > expected "stream-thread-metrics" as the type. > > > > 3. > > Isn't the value of the metric "restoring-standby-tasks" simply the > > number of standby tasks since standby tasks are basically always > > updating (aka restoring)? > > > > 4. > > "idle-ratio", "restore-ratio", and "checkpoint-ratio" seem metrics > > tailored to the upcoming state updater. They do not make much sense with > > a stream thread. Would it be better to introduce a new metrics level > > specifically for the state updater? > > > > 5. > > Personally, I do not like to use the word "restoration" together with > > standbys since restoration somehow implies that there is an offset for > > which the active task is considered restored and active processing can > > start. In other words, restoration is finite. Standby tasks rather > > update continuously their states. They can be up-to-date or lagging. I > > see that you could say "restored" instead of "up-to-date" and "not > > restored" instead of "lagging", but IMO it does not describe well the > > situation. That is a rather minor point. I just wanted to mention it. > > > > 6. > > The name "onRestorePaused()" might be confusing since in Kafka Streams > > users can also pause tasks. What about "onRestoreAborted()" or > > "onRestoreSuspended"? > > > > Best, > > Bruno > > > > > > On 16.09.22 19:33, Guozhang Wang wrote: > > > Hello everyone, > > > > > > I'd like to start a discussion for the following KIP, aiming to improve > > > Kafka Stream's restoration visibility via new metrics and callback > > methods: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-869%3A+Improve+Streams+State+Restoration+Visibility > > > > > > > > > Thanks! > > > -- Guozhang > > > > > > > > -- > -- Guozhang >
Re: [DISCUSS] KIP-844: Transactional State Stores
Hi everyone, I realise this has already been voted on and accepted, but it occurred to me today that the KIP doesn't define the migration/upgrade path for existing non-transactional StateStores that *become* transactional, i.e. by adding the transactional boolean to the StateStore constructor. What would be the result, when such a change is made to a Topology, without explicitly wiping the application state? a) An error. b) Local state is wiped. c) Existing RocksDB database is used as committed writes and new RocksDB database is created for uncommitted writes. d) Something else? Regards, Nick On Thu, 1 Sept 2022 at 12:16, Alexander Sorokoumov wrote: > Hey Guozhang, > > Sounds good. I annotated all added StateStore methods (commit, recover, > transactional) with @Evolving. > > Best, > Alex > > > > On Wed, Aug 31, 2022 at 7:32 PM Guozhang Wang wrote: > > > Hello Alex, > > > > Thanks for the detailed replies, I think that makes sense, and in the > long > > run we would need some public indicators from StateStore to determine if > > checkpoints can really be used to indicate clean snapshots. > > > > As for the @Evolving label, I think we can still keep it but for a > > different reason, since as we add more state management functionalities > in > > the near future we may need to revisit the public APIs again and hence > > keeping it as @Evolving would allow us to modify if necessary, in an > easier > > path than deprecate -> delete after several minor releases. > > > > Besides that, I have no further comments about the KIP. > > > > > > Guozhang > > > > On Fri, Aug 26, 2022 at 1:51 AM Alexander Sorokoumov > > wrote: > > > > > Hey Guozhang, > > > > > > > > > I think that we will have to keep StateStore#transactional() because > > > post-commit checkpointing of non-txn state stores will break the > > guarantees > > > we want in ProcessorStateManager#initializeStoreOffsetsFromCheckpoint > for > > > correct recovery. Let's consider checkpoint-recovery behavior under EOS > > > that we want to support: > > > > > > 1. Non-txn state stores should checkpoint on graceful shutdown and > > restore > > > from that checkpoint. > > > > > > 2. Non-txn state stores should delete local data during recovery after > a > > > crash failure. > > > > > > 3. Txn state stores should checkpoint on commit and on graceful > shutdown. > > > These stores should roll back uncommitted changes instead of deleting > all > > > local data. > > > > > > > > > #1 and #2 are already supported; this proposal adds #3. Essentially, we > > > have two parties at play here - the post-commit checkpointing in > > > StreamTask#postCommit and recovery in ProcessorStateManager# > > > initializeStoreOffsetsFromCheckpoint. Together, these methods must > allow > > > all three workflows and prevent invalid behavior, e.g., non-txn stores > > > should not checkpoint post-commit to avoid keeping uncommitted data on > > > recovery. > > > > > > > > > In the current state of the prototype, we checkpoint only txn state > > stores > > > post-commit under EOS using StateStore#transactional(). If we remove > > > StateStore#transactional() and always checkpoint post-commit, > > > ProcessorStateManager#initializeStoreOffsetsFromCheckpoint will have to > > > determine whether to delete local data. Non-txn implementation of > > > StateStore#recover can't detect if it has uncommitted writes. Since its > > > default implementation must always return either true or false, > signaling > > > whether it is restored into a valid committed-only state. If > > > StateStore#recover always returns true, we preserve uncommitted writes > > and > > > violate correctness. Otherwise, ProcessorStateManager# > > > initializeStoreOffsetsFromCheckpoint would always delete local data > even > > > after > > > a graceful shutdown. > > > > > > > > > With StateStore#transactional we avoid checkpointing non-txn state > stores > > > and prevent that problem during recovery. > > > > > > > > > Best, > > > > > > Alex > > > > > > On Fri, Aug 19, 2022 at 1:05 AM Guozhang Wang > > wrote: > > > > > > > Hello Alex, > > > > > > > > Thanks for the replies! > > > > > > > > > As long as we allow custom user implementations of that int