HI Daniel, I’ve been thinking about this question and I think this area is a bit tricky.
If there are some consumers in a share group with isolation level read_uncommitted and other consumers with read_committed, they have different expectations with regards to which messages are visible when EOS comes into the picture. It seems to me that this is not necessarily a good thing. One option would be to support just read_committed in KIP-932. This means it is unambiguous which records are in-flight, because they’re only committed ones. Another option would be to have the entire share group have an isolation level, which again gives an unambiguous set of in-flight records but without the restriction of permitting just read_committed behaviour. So, my preference is for the following: a) A share group has an isolation level that applies to all consumers in the group. b) The default isolation level for a share group is read_committed, in which case the SPSO and SPEO cannot move past the LSO. c) For a share group with read_uncommited isolation level, the SPSO and SPEO can move past the LSO. d) The kafka_configs.sh tool or the AdminClient can be used to set a non-default value for the isolation level for a share group. The value is applied when the first member joins. Thanks, Andrew > On 2 Jun 2023, at 10:02, Dániel Urbán <urb.dani...@gmail.com> wrote: > > Hi Andrew, > Thank you for the clarification. One follow-up to read_committed mode: > Taking the change in message ordering guarantees into account, does this > mean that in queues, share-group consumers will be able to consume > committed records AFTER the LSO? > Thanks, > Daniel > > Andrew Schofield <andrew_schofield_j...@outlook.com> ezt írta (időpont: > 2023. jún. 2., P, 10:39): > >> Hi Daniel, >> Thanks for your questions. >> >> 1) Yes, read_committed fetch will still be possible. >> >> 2) You weren’t wrong that this is a broad question :) >> >> Broadly speaking, I can see two ways of managing the in-flight records: >> the share-partition leader does it, or the share-group coordinator does it. >> I want to choose what works best, and I happen to have started with trying >> the share-partition leader doing it. This is just a whiteboard exercise at >> the >> moment, looking at the potential protocol flows and how well it all hangs >> together. When I have something coherent and understandable and worth >> reviewing, I’ll update the KIP with a proposal. >> >> I think it’s probably worth doing a similar exercise for the share-group >> coordinator way too. There are bound to be pros and cons, and I don’t >> really >> mind which way prevails. >> >> If the share-group coordinator does it, I already have experience of >> efficient >> storage of in-flight record state in a way that scales and is >> space-efficient. >> If the share-partition leader does it, storage of in-flight state is a bit >> more >> tricky. >> >> I think it’s worth thinking ahead to how EOS will work and also another >> couple of enhancements (key-based ordering and acquisition lock >> extension) so it’s somewhat future-proof. >> >> Thanks, >> Andrew >> >>> On 1 Jun 2023, at 11:51, Dániel Urbán <urb.dani...@gmail.com> wrote: >>> >>> Hi Andrew, >>> >>> Thank you for the KIP, exciting work you are doing :) >>> I have 2 questions: >>> 1. I understand that EOS won't be supported for share-groups (yet), but >>> read_committed fetch will still be possible, correct? >>> >>> 2. I have a very broad question about the proposed solution: why not let >>> the share-group coordinator manage the states of the in-flight records? >>> I'm asking this because it seems to me that using the same pattern as the >>> existing group coordinator would >>> a, solve the durability of the message state storage (same method as the >>> one used by the current group coordinator) >>> >>> b, pave the way for EOS with share-groups (same method as the one used by >>> the current group coordinator) >>> >>> c, allow follower-fetching >>> I saw your point about this: "FFF gives freedom to fetch records from a >>> nearby broker, but it does not also give the ability to commit offsets >> to a >>> nearby broker" >>> But does it matter if message acknowledgement is not "local"? Supposedly, >>> fetching is the actual hard work which benefits from follower fetching, >> not >>> the group related requests. >>> >>> The only problem I see with the share-group coordinator managing the >>> in-flight message state is that the coordinator is not aware of the exact >>> available offsets of a partition, nor how the messages are batched. For >>> this problem, maybe the share group coordinator could use some form of >>> "logical" addresses, such as "the next 2 batches after offset X", or >> "after >>> offset X, skip 2 batches, fetch next 2". Acknowledgements always contain >>> the exact offset, but for the "unknown" sections of a partition, these >>> logical addresses would be used. The coordinator could keep track of >>> message states with a mix of offsets and these batch based addresses. The >>> partition leader could support "skip X, fetch Y batches" fetch requests. >>> This solution would need changes in the Fetch API to allow such batch >> based >>> addresses, but I assume that fetch protocol changes will be needed >>> regardless of the specific solution. >>> >>> Thanks, >>> Daniel >>> >>> Andrew Schofield <andrew_schofi...@live.com> ezt írta (időpont: 2023. >> máj. >>> 30., K, 18:15): >>> >>>> Yes, that’s it. I imagine something similar to KIP-848 for managing the >>>> share group >>>> membership, and consumers that fetch records from their assigned >>>> partitions and >>>> acknowledge when delivery completes. >>>> >>>> Thanks, >>>> Andrew >>>> >>>>> On 30 May 2023, at 16:52, Adam Warski <a...@warski.org> wrote: >>>>> >>>>> Thanks for the explanation! >>>>> >>>>> So effectively, a share group is subscribed to each partition - but the >>>> data is not pushed to the consumer, but only sent on demand. And when >>>> demand is signalled, a batch of messages is sent? >>>>> Hence it would be up to the consumer to prefetch a sufficient number of >>>> batches to ensure, that it will never be "bored"? >>>>> >>>>> Adam >>>>> >>>>>> On 30 May 2023, at 15:25, Andrew Schofield <andrew_schofi...@live.com >>> >>>> wrote: >>>>>> >>>>>> Hi Adam, >>>>>> Thanks for your question. >>>>>> >>>>>> With a share group, each fetch is able to grab available records from >>>> any partition. So, it alleviates >>>>>> the “head-of-line” blocking problem where a slow consumer gets in the >>>> way. There’s no actual >>>>>> stealing from a slow consumer, but it can be overtaken and must >>>> complete its processing within >>>>>> the timeout. >>>>>> >>>>>> The way I see this working is that when a consumer joins a share >> group, >>>> it receives a set of >>>>>> assigned share-partitions. To start with, every consumer will be >>>> assigned all partitions. We >>>>>> can be smarter than that, but I think that’s really a question of >>>> writing a smarter assignor >>>>>> just as has occurred over the years with consumer groups. >>>>>> >>>>>> Only a small proportion of Kafka workloads are super high throughput. >>>> Share groups would >>>>>> struggle with those I’m sure. Share groups do not diminish the value >> of >>>> consumer groups >>>>>> for streaming. They just give another option for situations where a >>>> different style of >>>>>> consumption is more appropriate. >>>>>> >>>>>> Thanks, >>>>>> Andrew >>>>>> >>>>>>> On 29 May 2023, at 17:18, Adam Warski <a...@warski.org> wrote: >>>>>>> >>>>>>> Hello, >>>>>>> >>>>>>> thank you for the proposal! A very interesting read. >>>>>>> >>>>>>> I do have one question, though. When you subscribe to a topic using >>>> consumer groups, it might happen that one consumer has processed all >>>> messages from its partitions, while another one still has a lot of work >> to >>>> do (this might be due to unbalanced partitioning, long processing times >>>> etc.). In a message-queue approach, it would be great to solve this >> problem >>>> - so that a consumer that is free can steal work from other consumers. >> Is >>>> this somehow covered by share groups? >>>>>>> >>>>>>> Maybe this is planned as "further work", as indicated here: >>>>>>> >>>>>>> " >>>>>>> It manages the topic-partition assignments for the share-group >>>> members. An initial, trivial implementation would be to give each member >>>> the list of all topic-partitions which matches its subscriptions and >> then >>>> use the pull-based protocol to fetch records from all partitions. A more >>>> sophisticated implementation could use topic-partition load and lag >> metrics >>>> to distribute partitions among the consumers as a kind of autonomous, >>>> self-balancing partition assignment, steering more consumers to busier >>>> partitions, for example. Alternatively, a push-based fetching scheme >> could >>>> be used. Protocol details will follow later. >>>>>>> " >>>>>>> >>>>>>> but I’m not sure if I understand this correctly. A fully-connected >>>> graph seems like a lot of connections, and I’m not sure if this would >> play >>>> well with streaming. >>>>>>> >>>>>>> This also seems as one of the central problems - a key differentiator >>>> between share and consumer groups (the other one being persisting state >> of >>>> messages). And maybe the exact way we’d want to approach this would, to >> a >>>> certain degree, dictate the design of the queueing system? >>>>>>> >>>>>>> Best, >>>>>>> Adam Warski >>>>>>> >>>>>>> On 2023/05/15 11:55:14 Andrew Schofield wrote: >>>>>>>> Hi, >>>>>>>> I would like to start a discussion thread on KIP-932: Queues for >>>> Kafka. This KIP proposes an alternative to consumer groups to enable >>>> cooperative consumption by consumers without partition assignment. You >> end >>>> up with queue semantics on top of regular Kafka topics, with per-message >>>> acknowledgement and automatic handling of messages which repeatedly >> fail to >>>> be processed. >>>>>>>> >>>>>>>> >>>> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka >>>>>>>> >>>>>>>> Please take a look and let me know what you think. >>>>>>>> >>>>>>>> Thanks. >>>>>>>> Andrew >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Adam Warski >>>>> >>>>> https://www.softwaremill.com/ >>>>> https://twitter.com/adamwarski