Greetings, I was informed to ask this here but was unclear yet if this should be a PIP or not. Please advise if I should follow a different process. Thanks in advance.
The motivation is as follows: today we have KeyShared subscriptions in which the consumers are rapidly autoscaling up and down based on real-time demand. We've observed that for some data shapes/scaling conditions, some consumers will end up receiving considerably more traffic than the rest, slowing down consumption across the topic. We require all messages with any given routing key to only be outstanding to one consumer at a time, but otherwise have no preference for any range of keys to remain sticky to any given consumer. I've also seen reference to KeyShared routing performance concerns elsewhere, e.g.: https://github.com/apache/pulsar/issues/15705 The proposal is two-fold: first, we would like to implement a new KeyShared routing mechanism which tracks outstanding messages and their consumers, routing messages with outstanding keys to the current consumer handling that key, and any messages with new keys to any arbitrary consumer with available permits (or perhaps the consumer with the most permits). Basically a "first available consumer" routing strategy. As far as my naive first attempt goes, I initially decided to modify the StickyKeyConsumerSelector::select call to accept an mledger.Position in addition to the hash. I also added a release call to notify the selector of positions to consider no longer outstanding. I could see this being implemented any number of other ways as well (such as an entirely new dispatcher), and would appreciate guidance should this proposal move forward. Second, I believe the ability to choose a KeyShared routing scheme and perhaps the settings for that scheme should be configurable as a namespace/topic policy and not just in the broker config. I have not begun work on implementing that at all yet, but would assume it is not too complicated to do so (though the settings construct may be more freeform than expected). In the referenced PR above, Penghui mentioned that the cost of tracking all outstanding messages would be too costly as a reason for not implementing this in the first place, as "the current implementation doesn't need to maintain the state for each key since a topic might have a huge number of keys." I would like to counter that with most topics do not have in excess of hundreds of thousands of messages outstanding simultaneously in the first place, there is already a policy to limit the number of outstanding messages by subscription, and you don't actually need an entry for each key necessarily, if performance/memory constraints demanded, some sort of reference count by (shortened) hash could be attempted. Regardless, I have not performance tested my idea just yet as if it will be rejected for other reasons the effort is not well spent. To give some idea on our stats, we host multiple clusters today but the busiest one has maybe 36 topics which each handle roughly 8k msgs/s, between 100-500 producers, a similar number of consumers, and an average of 500 unacked messages at a time, with a peak of 15k or so, occasionally bouncing off our configured policy. Consumers tend to autoscale every 3-5 minutes, but can cycle even faster than that depending on the rest of our platform. This functionality is what lead us to using Pulsar in the first place, and in that respect it has served decently enough, but ongoing cost optimizations would prefer better consumer performance. I look forward to hearing your opinions on this proposal and any next steps I can perform. Thank you for your time! -Tim Corbett tcorb...@mparticle.com Principal Platform Engineer - mParticle