HI Daniel,
I’ve been thinking about this question and I think this area is a bit tricky.

If there are some consumers in a share group with isolation level 
read_uncommitted
and other consumers with read_committed, they have different expectations with
regards to which messages are visible when EOS comes into the picture.
It seems to me that this is not necessarily a good thing.

One option would be to support just read_committed in KIP-932. This means
it is unambiguous which records are in-flight, because they’re only committed
ones.

Another option would be to have the entire share group have an isolation level,
which again gives an unambiguous set of in-flight records but without the
restriction of permitting just read_committed behaviour.

So, my preference is for the following:
a) A share group has an isolation level that applies to all consumers in the 
group.
b) The default isolation level for a share group is read_committed, in which 
case
the SPSO and SPEO cannot move past the LSO.
c) For a share group with read_uncommited isolation level, the SPSO and SPEO
can move past the LSO.
d) The kafka_configs.sh tool or the AdminClient can be used to set a non-default
value for the isolation level for a share group. The value is applied when the 
first
member joins.

Thanks,
Andrew

> On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote:
>
> Hi Andrew,
> Thank you for the clarification. One follow-up to read_committed mode:
> Taking the change in message ordering guarantees into account, does this
> mean that in queues, share-group consumers will be able to consume
> committed records AFTER the LSO?
> Thanks,
> Daniel
>
> Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont:
> 2023. jún. 2., P, 10:39):
>
>> Hi Daniel,
>> Thanks for your questions.
>>
>> 1) Yes, read_committed fetch will still be possible.
>>
>> 2) You weren’t wrong that this is a broad question :)
>>
>> Broadly speaking, I can see two ways of managing the in-flight records:
>> the share-partition leader does it, or the share-group coordinator does it.
>> I want to choose what works best, and I happen to have started with trying
>> the share-partition leader doing it. This is just a whiteboard exercise at
>> the
>> moment, looking at the potential protocol flows and how well it all hangs
>> together. When I have something coherent and understandable and worth
>> reviewing, I’ll update the KIP with a proposal.
>>
>> I think it’s probably worth doing a similar exercise for the share-group
>> coordinator way too. There are bound to be pros and cons, and I don’t
>> really
>> mind which way prevails.
>>
>> If the share-group coordinator does it, I already have experience of
>> efficient
>> storage of in-flight record state in a way that scales and is
>> space-efficient.
>> If the share-partition leader does it, storage of in-flight state is a bit
>> more
>> tricky.
>>
>> I think it’s worth thinking ahead to how EOS will work and also another
>> couple of enhancements (key-based ordering and acquisition lock
>> extension) so it’s somewhat future-proof.
>>
>> Thanks,
>> Andrew
>>
>>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote:
>>>
>>> Hi Andrew,
>>>
>>> Thank you for the KIP, exciting work you are doing :)
>>> I have 2 questions:
>>> 1. I understand that EOS won't be supported for share-groups (yet), but
>>> read_committed fetch will still be possible, correct?
>>>
>>> 2. I have a very broad question about the proposed solution: why not let
>>> the share-group coordinator manage the states of the in-flight records?
>>> I'm asking this because it seems to me that using the same pattern as the
>>> existing group coordinator would
>>> a, solve the durability of the message state storage (same method as the
>>> one used by the current group coordinator)
>>>
>>> b, pave the way for EOS with share-groups (same method as the one used by
>>> the current group coordinator)
>>>
>>> c, allow follower-fetching
>>> I saw your point about this: "FFF gives freedom to fetch records from a
>>> nearby broker, but it does not also give the ability to commit offsets
>> to a
>>> nearby broker"
>>> But does it matter if message acknowledgement is not "local"? Supposedly,
>>> fetching is the actual hard work which benefits from follower fetching,
>> not
>>> the group related requests.
>>>
>>> The only problem I see with the share-group coordinator managing the
>>> in-flight message state is that the coordinator is not aware of the exact
>>> available offsets of a partition, nor how the messages are batched. For
>>> this problem, maybe the share group coordinator could use some form of
>>> "logical" addresses, such as "the next 2 batches after offset X", or
>> "after
>>> offset X, skip 2 batches, fetch next 2". Acknowledgements always contain
>>> the exact offset, but for the "unknown" sections of a partition, these
>>> logical addresses would be used. The coordinator could keep track of
>>> message states with a mix of offsets and these batch based addresses. The
>>> partition leader could support "skip X, fetch Y batches" fetch requests.
>>> This solution would need changes in the Fetch API to allow such batch
>> based
>>> addresses, but I assume that fetch protocol changes will be needed
>>> regardless of the specific solution.
>>>
>>> Thanks,
>>> Daniel
>>>
>>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: 2023.
>> máj.
>>> 30., K, 18:15):
>>>
>>>> Yes, that’s it. I imagine something similar to KIP-848 for managing the
>>>> share group
>>>> membership, and consumers that fetch records from their assigned
>>>> partitions and
>>>> acknowledge when delivery completes.
>>>>
>>>> Thanks,
>>>> Andrew
>>>>
>>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote:
>>>>>
>>>>> Thanks for the explanation!
>>>>>
>>>>> So effectively, a share group is subscribed to each partition - but the
>>>> data is not pushed to the consumer, but only sent on demand. And when
>>>> demand is signalled, a batch of messages is sent?
>>>>> Hence it would be up to the consumer to prefetch a sufficient number of
>>>> batches to ensure, that it will never be "bored"?
>>>>>
>>>>> Adam
>>>>>
>>>>>> On 30 May 2023, at 15:25, Andrew Schofield <andrew_schofi...@live.com
>>>
>>>> wrote:
>>>>>>
>>>>>> Hi Adam,
>>>>>> Thanks for your question.
>>>>>>
>>>>>> With a share group, each fetch is able to grab available records from
>>>> any partition. So, it alleviates
>>>>>> the “head-of-line” blocking problem where a slow consumer gets in the
>>>> way. There’s no actual
>>>>>> stealing from a slow consumer, but it can be overtaken and must
>>>> complete its processing within
>>>>>> the timeout.
>>>>>>
>>>>>> The way I see this working is that when a consumer joins a share
>> group,
>>>> it receives a set of
>>>>>> assigned share-partitions. To start with, every consumer will be
>>>> assigned all partitions. We
>>>>>> can be smarter than that, but I think that’s really a question of
>>>> writing a smarter assignor
>>>>>> just as has occurred over the years with consumer groups.
>>>>>>
>>>>>> Only a small proportion of Kafka workloads are super high throughput.
>>>> Share groups would
>>>>>> struggle with those I’m sure. Share groups do not diminish the value
>> of
>>>> consumer groups
>>>>>> for streaming. They just give another option for situations where a
>>>> different style of
>>>>>> consumption is more appropriate.
>>>>>>
>>>>>> Thanks,
>>>>>> Andrew
>>>>>>
>>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote:
>>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> thank you for the proposal! A very interesting read.
>>>>>>>
>>>>>>> I do have one question, though. When you subscribe to a topic using
>>>> consumer groups, it might happen that one consumer has processed all
>>>> messages from its partitions, while another one still has a lot of work
>> to
>>>> do (this might be due to unbalanced partitioning, long processing times
>>>> etc.). In a message-queue approach, it would be great to solve this
>> problem
>>>> - so that a consumer that is free can steal work from other consumers.
>> Is
>>>> this somehow covered by share groups?
>>>>>>>
>>>>>>> Maybe this is planned as "further work", as indicated here:
>>>>>>>
>>>>>>> "
>>>>>>> It manages the topic-partition assignments for the share-group
>>>> members. An initial, trivial implementation would be to give each member
>>>> the list of all topic-partitions which matches its subscriptions and
>> then
>>>> use the pull-based protocol to fetch records from all partitions. A more
>>>> sophisticated implementation could use topic-partition load and lag
>> metrics
>>>> to distribute partitions among the consumers as a kind of autonomous,
>>>> self-balancing partition assignment, steering more consumers to busier
>>>> partitions, for example. Alternatively, a push-based fetching scheme
>> could
>>>> be used. Protocol details will follow later.
>>>>>>> "
>>>>>>>
>>>>>>> but I’m not sure if I understand this correctly. A fully-connected
>>>> graph seems like a lot of connections, and I’m not sure if this would
>> play
>>>> well with streaming.
>>>>>>>
>>>>>>> This also seems as one of the central problems - a key differentiator
>>>> between share and consumer groups (the other one being persisting state
>> of
>>>> messages). And maybe the exact way we’d want to approach this would, to
>> a
>>>> certain degree, dictate the design of the queueing system?
>>>>>>>
>>>>>>> Best,
>>>>>>> Adam Warski
>>>>>>>
>>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote:
>>>>>>>> Hi,
>>>>>>>> I would like to start a discussion thread on KIP-932: Queues for
>>>> Kafka. This KIP proposes an alternative to consumer groups to enable
>>>> cooperative consumption by consumers without partition assignment. You
>> end
>>>> up with queue semantics on top of regular Kafka topics, with per-message
>>>> acknowledgement and automatic handling of messages which repeatedly
>> fail to
>>>> be processed.
>>>>>>>>
>>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
>>>>>>>>
>>>>>>>> Please take a look and let me know what you think.
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> Andrew
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Adam Warski
>>>>>
>>>>> https://www.softwaremill.com/
>>>>> https://twitter.com/adamwarski


Reply via email to