Hi Luke, Thanks for your comments. 1) I expect that fetch-from-follower will not be supported for share groups. If you think about it, FFF gives freedom to fetch records from a nearby broker, but it does not also give the ability to commit offsets to a nearby broker. For a share-group, the share-partition leader is intimately involved in which records are fetched and acknowledged.
2) I have two big areas to fill in with the KIP - the RPCs and the storage. I’m working on the RPCs now. The storage will be next. 3) Yes, we need new metrics. I’ll put a placeholder in the next update to the KIP. I think it will be easy to enumerate them once the proposal has stabilised. Thanks, Andrew > On 29 May 2023, at 10:04, Luke Chen <show...@gmail.com> wrote: > > Hi Andrew, > > Thanks for the KIP. > Some high level questions: > 1. How do we handle "fetch from follower" case? > It looks like in current design, each call needs to go to "shared partition > leader", where the shared state stored. Is my understanding correct? > > 2. Where does the state info stored? > It looks like we only store them in the memory of "shared partition > leader". What happened after the leader crashed and move to other ISR > replica? > > 3. New metrics needed > Since we're introducing a new kind of consumer group, I think there should > be new metrics added for client and broker to monitor them. > > Thank you. > Luke > > On Mon, May 29, 2023 at 1:01 PM Satish Duggana <satish.dugg...@gmail.com> > wrote: > >> Minor correction on 103, latest instead of earliest for SPSO default value. >> >> 103 It talks about SPSO values, latest being the default and user >> can reset it to a target offset timestamp. What is the maximum value >> for SPEO? It is good to clarify what could be the maximum value for >> SPSO and SPEO. It can be HW or LogStableOffset or some other value? >> >> Thanks, >> Satish. >> >> On Mon, 29 May 2023 at 10:06, Satish Duggana <satish.dugg...@gmail.com> >> wrote: >>> >>> Hi Andrew, >>> Thanks for the nice KIP on a very interesting feature about >>> introducing some of the traditional MessageQueue semantics to Kafka. >>> It is good to see that we are extending the existing consumer groups >>> concepts and related mechanisms for shared subscriptions instead of >>> bringing any large architectural/protocol changes. >>> >>> This KIP talks about introducing a durable subscription feature for >>> topics with multiple consumers consuming messages parallely from a >>> single topic partition. >>> >>> 101 Are you planning to extend this functionality for queueing >>> semantics like JMS point to point style in future? >>> >>> 102 When a message is rejected by the target consumer, how do users >>> know what records/offsets are dropped because of the failed records >>> due to rejection ack or due to timeouts etc before DLQs are >>> introduced? >>> >>> 103 It talks about SPSO values, earliest being the default and user >>> can reset it to a target offset timestamp. What is the maximum value >>> for SPEO? It is good to clarify what could be the maximum value for >>> SPSO and SPEO. It can be HW or LogStableOffset or some other value? >>> >>> 104 KIP mentions that "share.delivery.count.limit" as the maximum >>> number of delivery attempts for a record delivered to a share group. >>> But the actual delivery count may be more than this number as the >>> leader may fail updating the delivery count as leader or consumer may >>> fail and more delivery attempts may be made later. It may be the >>> minimum number of delivery attempts instead of the maximum delivery >>> attempts. >>> >>> Thanks, >>> Satish. >>> >>> >>> On Wed, 24 May 2023 at 21:26, Andrew Schofield >>> <andrew_schofield_j...@outlook.com> wrote: >>>> >>>> Hi Stanislav, >>>> Thanks for your email. You bring up some interesting points. >>>> >>>> 1) Tiered storage >>>> I think the situation here for fetching historical data is equivalent >> to what happens if a user resets the committed offset for a consumer >>>> group back to an earlier point in time. So, I will mention this in the >> next update to the KIP document but I think there's nothing >>>> especially different here. >>>> >>>> 2) SSO initialized to the latest offset >>>> The KIP does mention that it is possible for an administrator to set >> the SSO using either AdminClient.alterShareGroupOffsets or >>>> kafka-share-groups.sh. It is entirely intentional that there is no >> KafkaConsumer config for initializing the SSO. I know that's how it >>>> can be done for consumer groups, but it suffers from the situation >> where different consumers have different opinions about >>>> the initial value (earliest vs latest) and then the first one in wins. >> Also, KIP-842 digs into some problems with how consumer >>>> group offset reset works ( >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms) >> so >>>> I've tried to sidestep those problems too. >>>> >>>> Another possibility is to follow KIP-848 which proposes that >> AdminClient.incrementalAlterConfigs is enhanced to support a new >>>> resource type called GROUP and supporting a dynamic group config in >> this manner would give a single point of control. >>>> >>>> 3) Durable storage >>>> The KIP does not yet describe how durable storage works. I have a few >> ideas that I want to flesh out before updating the KIP. >>>> >>>> I will rule out using a compacted topic though. The problem is that >> each record on a compacted topic is a key:value pair, and >>>> it's not obvious what to use as the key. If it's the share group name, >> it needs the entire in-flight record state to be recorded in >>>> one hit which is extremely inefficient. >>>> >>>> 4) Batch acknowledgement >>>> You are correct that compression makes delivery and acknowledgement of >> individual messages within a compressed batch >>>> more complicated. Again, I will defer a proper answer here until I've >> dug more deeply. >>>> >>>> 5) Member management >>>> Member management will be similar to consumer groups. I anticipate >> that it will build entirely on the new consumer group >>>> protocol in KIP-848. There seems little benefit in supporting the >> legacy protocol when this KIP is targeting versions of Kafka >>>> which will all have KIP-848. >>>> >>>> The two cases you mention: >>>> i) If a bad consumer doesn't even heartbeat, it will be ejected from >> the group. This does not involve a rebalance. >>>> ii) If a bad consumer heartbeats but always times out message >> processing, it will slow the advancement of the SSO/SEO. There >>>> is the possibility that such a consumer would invalidate completely >> valid messages. In order to do this, it would have to acquire >>>> the same set of message repeatedly, to the exclusion of other >> consumers, and thus bump the delivery count to the limit. >>>> This is unlikely but not impossible. >>>> >>>> 6) Processing semantics >>>> Delivery is at-least-once. >>>> >>>> 7) Acronyms >>>> I hadn't thought about the implications of "Kafka SEO". I think I'll >> change it to "Share Partition Start Offset" (SPSO) and >>>> "Share Partition End Offset" (SPEO). >>>> >>>> There is a lot of work ahead for this KIP. I intend to work on the >> protocol changes next. >>>> >>>> Thanks for getting involved in the discussion. >>>> Andrew >>>> >>>> From: Stanislav Kozlovski <stanis...@confluent.io.INVALID> >>>> Sent: 22 May 2023 11:20 >>>> To: dev@kafka.apache.org <dev@kafka.apache.org> >>>> Subject: Re: [DISCUSS] KIP-932: Queues for Kafka >>>> >>>> Hey Andrew! >>>> >>>> Kudos on the proposal. It is greatly written - a joy to read. It is >>>> definitely an interesting solution to the queueing problem - I would >> not >>>> have guessed we could solve it like this. Thank you for working on >> this. >>>> >>>> Happy to get the discussion started - I have a few comments/questions >> on >>>> first read: >>>> >>>> 1. Tiered Storage >>>> >>>> I notice no mention of Tiered Storage (KIP-405). Does that complicate >> the >>>> design, especially when fetching historical data? It would be good to >> have >>>> at least one sentence mentioning it, even if it doesn't impact it. >> Right >>>> now I'm unsure if it was considered. >>>> >>>> 2. SSO initialized to the latest offset >>>> >>>>> "By default, the SSO for each share-partition is initialized to the >>>> latest offset for the corresponding topic-partitions." >>>> >>>> Have we considered allowing this to be configurable to latest/earliest? >>>> This would be consistent with the auto.offset.reset config of vanilla >>>> consumer groups. >>>> Thinking from a user's perspective, it sounds valid to want to start >> from >>>> the start of a topic when starting a share group. Historical processing >>>> comes to mind >>>> >>>> 3. Durable Storage >>>> >>>> The KIP mentions that "The cluster records this information durably", >> which >>>> implies that it saves it somewhere. Does the ShareCoordinator have its >> own >>>> topic? Would it be compacted? >>>> >>>> In particular, I am interested in what such a topic's retention would >> be >>>> like. The vanilla consumer offsets topic has some special retention >>>> semantics (KIP-211) where we start counting the retention after the >>>> consumer group becomes empty (inactive) - the default being 7 days. >> Need to >>>> make sure the retention here isn't too short either, as the offsets >> topic >>>> originally had 24 hours of retention and that proved problematic. >>>> >>>> In general, some extra detail about the persistence would be greatly >>>> appreciated! >>>> >>>> 4. Batch Acknowledgement >>>> >>>>> "In the situation where some records in a batch have been released or >>>> rejected separately, subsequent fetches of those records are more >> likely to >>>> have gaps." >>>> >>>> Can we expand a bit more on this edge case? I am interested in learning >>>> what gets returned on subsequent fetch requests. >>>> In particular, - how does this work with compression? As far as I >> remember, >>>> we can compress the whole batch there, which might make individual >> record >>>> filtering tricky. >>>> >>>> 5. Member Management >>>> >>>> How is consumer group member management handled? I didn't see any >> specific >>>> mention - is it the same as a vanilla group? >>>> In particular - how will bad consumers be handled? >>>> >>>> I guess I see two cases: >>>> 1. bad consumer that doesn't even heartbeat >>>> 2. bad consumer that heartbeats well but for some reason every message >>>> processing times out. e.g imagine it was network partitioned from some >>>> third-party system that is a critical part of its message processing >> loop >>>> >>>> One evident problem I can foresee in production systems is one (or a >> few) >>>> slow consumer applications bringing the SSO/SEO advancement down to a >> crawl. >>>> Imagine an example where the same consumer app always hits the timeout >>>> limit - what would the behavior be in such a case? Do we keep that >> consumer >>>> app indefinitely (if so, do we run the risk of having it invalidate >>>> completely valid messages)? Are there any equivalents to the consumer >> group >>>> rebalances which fence off such bad consumers? >>>> >>>> 6. Processing Semantics (exactly once) >>>> >>>>> The delivery counts are only maintained approximately and the >> Acquired >>>> state is not persisted. >>>> >>>> Does this introduce the risk of zombie consumers on >> share-partition-leader >>>> failure? i.e restarting and giving another consumer the acquired state >> for >>>> the same record >>>> >>>> I notice that the KIP says: >>>>> Finally, this KIP does not include support for acknowledging delivery >>>> using transactions for exactly-once semantics. >>>> at the very end. It would be helpful to address this earlier in the >>>> example, as one of the key points. And it would be good to be clearer >> on >>>> what the processing semantics are. They seem to be *at-least-once* to >> me. >>>> >>>> >>>> 7. nit: Acronyms >>>> >>>> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is >> bound >>>> to return weird results. >>>> What do we think about the tradeoff of using more-unique acronyms (like >>>> SGEO, SSGO) at the expense of one extra letter? >>>> >>>> Again - thanks for working on this! I think it's a great initiative. >> I'm >>>> excited to see us perfect this proposal and enable a brand new use >> case in >>>> Kafka! >>>> >>>> Best, >>>> Stanislav >>>> >>>> >>>> >>>> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield < >> andrew_schofi...@live.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I would like to start a discussion thread on KIP-932: Queues for >> Kafka. >>>>> This KIP proposes an alternative to consumer groups to enable >> cooperative >>>>> consumption by consumers without partition assignment. You end up >> with >>>>> queue semantics on top of regular Kafka topics, with per-message >>>>> acknowledgement and automatic handling of messages which repeatedly >> fail to >>>>> be processed. >>>>> >>>>> >>>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka >>>>> >>>>> Please take a look and let me know what you think. >>>>> >>>>> Thanks. >>>>> Andrew >>>> >>>> >>>> >>>> -- >>>> Best, >>>> Stanislav