Hi Luke,
Thanks for your comments.

1) I expect that fetch-from-follower will not be supported for share groups. If 
you think about it,
FFF gives freedom to fetch records from a nearby broker, but it does not also 
give the
ability to commit offsets to a nearby broker. For a share-group, the 
share-partition leader is
intimately involved in which records are fetched and acknowledged.

2) I have two big areas to fill in with the KIP - the RPCs and the storage. I’m 
working on the
RPCs now. The storage will be next.

3) Yes, we need new metrics. I’ll put a placeholder in the next update to the 
KIP. I think it will
be easy to enumerate them once the proposal has stabilised.

Thanks,
Andrew


> On 29 May 2023, at 10:04, Luke Chen <show...@gmail.com> wrote:
>
> Hi Andrew,
>
> Thanks for the KIP.
> Some high level questions:
> 1. How do we handle "fetch from follower" case?
> It looks like in current design, each call needs to go to "shared partition
> leader", where the shared state stored. Is my understanding correct?
>
> 2. Where does the state info stored?
> It looks like we only store them in the memory of "shared partition
> leader". What happened after the leader crashed and move to other ISR
> replica?
>
> 3. New metrics needed
> Since we're introducing a new kind of consumer group, I think there should
> be new metrics added for client and broker to monitor them.
>
> Thank you.
> Luke
>
> On Mon, May 29, 2023 at 1:01 PM Satish Duggana <satish.dugg...@gmail.com>
> wrote:
>
>> Minor correction on 103, latest instead of earliest for SPSO default value.
>>
>> 103 It talks about SPSO values, latest being the default and user
>> can reset it to a target offset timestamp. What is the maximum value
>> for SPEO? It is good to clarify what could be the maximum value for
>> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>>
>> Thanks,
>> Satish.
>>
>> On Mon, 29 May 2023 at 10:06, Satish Duggana <satish.dugg...@gmail.com>
>> wrote:
>>>
>>> Hi Andrew,
>>> Thanks for the nice KIP on a very interesting feature about
>>> introducing some of the traditional MessageQueue semantics to Kafka.
>>> It is good to see that we are extending the existing consumer groups
>>> concepts and related mechanisms for shared subscriptions instead of
>>> bringing any large architectural/protocol changes.
>>>
>>> This KIP talks about introducing a durable subscription feature for
>>> topics with multiple consumers consuming messages parallely from a
>>> single topic partition.
>>>
>>> 101 Are you planning to extend this functionality for queueing
>>> semantics like JMS point to point style in future?
>>>
>>> 102 When a message is rejected by the target consumer, how do users
>>> know what records/offsets are dropped because of the failed records
>>> due to rejection ack or due to timeouts etc before DLQs are
>>> introduced?
>>>
>>> 103 It talks about SPSO values, earliest being the default and user
>>> can reset it to a target offset timestamp. What is the maximum value
>>> for SPEO? It is good to clarify what could be the maximum value for
>>> SPSO and SPEO. It can be HW or LogStableOffset or some other value?
>>>
>>> 104 KIP mentions that "share.delivery.count.limit" as the maximum
>>> number of delivery attempts for a record delivered to a share group.
>>> But the actual delivery count may be more than this number as the
>>> leader may fail updating the delivery count as leader or consumer may
>>> fail and more delivery attempts may be made later. It may be the
>>> minimum number of delivery attempts instead of the maximum delivery
>>> attempts.
>>>
>>> Thanks,
>>> Satish.
>>>
>>>
>>> On Wed, 24 May 2023 at 21:26, Andrew Schofield
>>> <andrew_schofield_j...@outlook.com> wrote:
>>>>
>>>> Hi Stanislav,
>>>> Thanks for your email. You bring up some interesting points.
>>>>
>>>> 1) Tiered storage
>>>> I think the situation here for fetching historical data is equivalent
>> to what happens if a user resets the committed offset for a consumer
>>>> group back to an earlier point in time. So, I will mention this in the
>> next update to the KIP document but I think there's nothing
>>>> especially different here.
>>>>
>>>> 2) SSO initialized to the latest offset
>>>> The KIP does mention that it is possible for an administrator to set
>> the SSO using either AdminClient.alterShareGroupOffsets or
>>>> kafka-share-groups.sh. It is entirely intentional that there is no
>> KafkaConsumer config for initializing the SSO. I know that's how it
>>>> can be done for consumer groups, but it suffers from the situation
>> where different consumers have different opinions about
>>>> the initial value (earliest vs latest) and then the first one in wins.
>> Also, KIP-842 digs into some problems with how consumer
>>>> group offset reset works (
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-842%3A+Add+richer+group+offset+reset+mechanisms)
>> so
>>>> I've tried to sidestep those problems too.
>>>>
>>>> Another possibility is to follow KIP-848 which proposes that
>> AdminClient.incrementalAlterConfigs is enhanced to support a new
>>>> resource type called GROUP and supporting a dynamic group config in
>> this manner would give a single point of control.
>>>>
>>>> 3) Durable storage
>>>> The KIP does not yet describe how durable storage works. I have a few
>> ideas that I want to flesh out before updating the KIP.
>>>>
>>>> I will rule out using a compacted topic though. The problem is that
>> each record on a compacted topic is a key:value pair, and
>>>> it's not obvious what to use as the key. If it's the share group name,
>> it needs the entire in-flight record state to be recorded in
>>>> one hit which is extremely inefficient.
>>>>
>>>> 4) Batch acknowledgement
>>>> You are correct that compression makes delivery and acknowledgement of
>> individual messages within a compressed batch
>>>> more complicated. Again, I will defer a proper answer here until I've
>> dug more deeply.
>>>>
>>>> 5) Member management
>>>> Member management will be similar to consumer groups. I anticipate
>> that it will build entirely on the new consumer group
>>>> protocol in KIP-848. There seems little benefit in supporting the
>> legacy protocol when this KIP is targeting versions of Kafka
>>>> which will all have KIP-848.
>>>>
>>>> The two cases you mention:
>>>> i) If a bad consumer doesn't even heartbeat, it will be ejected from
>> the group. This does not involve a rebalance.
>>>> ii) If a bad consumer heartbeats but always times out message
>> processing, it will slow the advancement of the SSO/SEO. There
>>>> is the possibility that such a consumer would invalidate completely
>> valid messages. In order to do this, it would have to acquire
>>>> the same set of message repeatedly, to the exclusion of other
>> consumers, and thus bump the delivery count to the limit.
>>>> This is unlikely but not impossible.
>>>>
>>>> 6) Processing semantics
>>>> Delivery is at-least-once.
>>>>
>>>> 7) Acronyms
>>>> I hadn't thought about the implications of "Kafka SEO". I think I'll
>> change it to "Share Partition Start Offset" (SPSO) and
>>>> "Share Partition End Offset" (SPEO).
>>>>
>>>> There is a lot of work ahead for this KIP. I intend to work on the
>> protocol changes next.
>>>>
>>>> Thanks for getting involved in the discussion.
>>>> Andrew
>>>>
>>>> From: Stanislav Kozlovski <stanis...@confluent.io.INVALID>
>>>> Sent: 22 May 2023 11:20
>>>> To: dev@kafka.apache.org <dev@kafka.apache.org>
>>>> Subject: Re: [DISCUSS] KIP-932: Queues for Kafka
>>>>
>>>> Hey Andrew!
>>>>
>>>> Kudos on the proposal. It is greatly written - a joy to read. It is
>>>> definitely an interesting solution to the queueing problem - I would
>> not
>>>> have guessed we could solve it like this. Thank you for working on
>> this.
>>>>
>>>> Happy to get the discussion started - I have a few comments/questions
>> on
>>>> first read:
>>>>
>>>> 1. Tiered Storage
>>>>
>>>> I notice no mention of Tiered Storage (KIP-405). Does that complicate
>> the
>>>> design, especially when fetching historical data? It would be good to
>> have
>>>> at least one sentence mentioning it, even if it doesn't impact it.
>> Right
>>>> now I'm unsure if it was considered.
>>>>
>>>> 2. SSO initialized to the latest offset
>>>>
>>>>> "By default, the SSO for each share-partition is initialized to the
>>>> latest offset for the corresponding topic-partitions."
>>>>
>>>> Have we considered allowing this to be configurable to latest/earliest?
>>>> This would be consistent with the auto.offset.reset config of vanilla
>>>> consumer groups.
>>>> Thinking from a user's perspective, it sounds valid to want to start
>> from
>>>> the start of a topic when starting a share group. Historical processing
>>>> comes to mind
>>>>
>>>> 3. Durable Storage
>>>>
>>>> The KIP mentions that "The cluster records this information durably",
>> which
>>>> implies that it saves it somewhere. Does the ShareCoordinator have its
>> own
>>>> topic? Would it be compacted?
>>>>
>>>> In particular, I am interested in what such a topic's retention would
>> be
>>>> like. The vanilla consumer offsets topic has some special retention
>>>> semantics (KIP-211) where we start counting the retention after the
>>>> consumer group becomes empty (inactive) - the default being 7 days.
>> Need to
>>>> make sure the retention here isn't too short either, as the offsets
>> topic
>>>> originally had 24 hours of retention and that proved problematic.
>>>>
>>>> In general, some extra detail about the persistence would be greatly
>>>> appreciated!
>>>>
>>>> 4. Batch Acknowledgement
>>>>
>>>>> "In the situation where some records in a batch have been released or
>>>> rejected separately, subsequent fetches of those records are more
>> likely to
>>>> have gaps."
>>>>
>>>> Can we expand a bit more on this edge case? I am interested in learning
>>>> what gets returned on subsequent fetch requests.
>>>> In particular, - how does this work with compression? As far as I
>> remember,
>>>> we can compress the whole batch there, which might make individual
>> record
>>>> filtering tricky.
>>>>
>>>> 5. Member Management
>>>>
>>>> How is consumer group member management handled? I didn't see any
>> specific
>>>> mention - is it the same as a vanilla group?
>>>> In particular - how will bad consumers be handled?
>>>>
>>>> I guess I see two cases:
>>>> 1. bad consumer that doesn't even heartbeat
>>>> 2. bad consumer that heartbeats well but for some reason every message
>>>> processing times out. e.g imagine it was network partitioned from some
>>>> third-party system that is a critical part of its message processing
>> loop
>>>>
>>>> One evident problem I can foresee in production systems is one (or a
>> few)
>>>> slow consumer applications bringing the SSO/SEO advancement down to a
>> crawl.
>>>> Imagine an example where the same consumer app always hits the timeout
>>>> limit - what would the behavior be in such a case? Do we keep that
>> consumer
>>>> app indefinitely (if so, do we run the risk of having it invalidate
>>>> completely valid messages)? Are there any equivalents to the consumer
>> group
>>>> rebalances which fence off such bad consumers?
>>>>
>>>> 6. Processing Semantics (exactly once)
>>>>
>>>>> The delivery counts are only maintained approximately and the
>> Acquired
>>>> state is not persisted.
>>>>
>>>> Does this introduce the risk of zombie consumers on
>> share-partition-leader
>>>> failure? i.e restarting and giving another consumer the acquired state
>> for
>>>> the same record
>>>>
>>>> I notice that the KIP says:
>>>>> Finally, this KIP does not include support for acknowledging delivery
>>>> using transactions for exactly-once semantics.
>>>> at the very end. It would be helpful to address this earlier in the
>>>> example, as one of the key points. And it would be good to be clearer
>> on
>>>> what the processing semantics are. They seem to be *at-least-once* to
>> me.
>>>>
>>>>
>>>> 7. nit: Acronyms
>>>>
>>>> I feel like SSO and SEO may be bad acronyms. Googling "Kafka SEO" is
>> bound
>>>> to return weird results.
>>>> What do we think about the tradeoff of using more-unique acronyms (like
>>>> SGEO, SSGO) at the expense of one extra letter?
>>>>
>>>> Again - thanks for working on this! I think it's a great initiative.
>> I'm
>>>> excited to see us perfect this proposal and enable a brand new use
>> case in
>>>> Kafka!
>>>>
>>>> Best,
>>>> Stanislav
>>>>
>>>>
>>>>
>>>> On Mon, May 15, 2023 at 2:55 PM Andrew Schofield <
>> andrew_schofi...@live.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I would like to start a discussion thread on KIP-932: Queues for
>> Kafka.
>>>>> This KIP proposes an alternative to consumer groups to enable
>> cooperative
>>>>> consumption by consumers without partition assignment. You end up
>> with
>>>>> queue semantics on top of regular Kafka topics, with per-message
>>>>> acknowledgement and automatic handling of messages which repeatedly
>> fail to
>>>>> be processed.
>>>>>
>>>>>
>>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>
>>>>> Please take a look and let me know what you think.
>>>>>
>>>>> Thanks.
>>>>> Andrew
>>>>
>>>>
>>>>
>>>> --
>>>> Best,
>>>> Stanislav


Reply via email to