Hi Pulsar Community, Here are the meeting notes from today's community meeting. Thanks to all who participated!
As you'll see, we had a very long discussion about the idea of replacing bundles. The other two topics were on broker metadata cache evictions and the pulsar protocol. Disclaimer: if something is misattributed or misrepresented, please send a correction to this list. Source google doc: https://docs.google.com/document/d/19dXkVXeU2q_nHmkG8zURjKnYlvD96TbKf5KjYyASsOE Thanks, Michael 2022/09/01, (8:30 AM PST) - Attendees: - Matteo Merli - Lari Hotari - Enrico Olivelli - Andrey Yegorov - Dave Fisher - Heesung Sohn - Ayman Khalil - Michael Marshall - Asaf Mesika - Christophe Bornet - Discussions/PIPs/PRs (Generally discussed in order they appear) - Lari: the mailing list discussion about namespace bundles and the long term plan. Matteo: I would like to remove them. They can be very annoying. They have to be big enough to be working, and small enough to not cause imbalance problems. There is an order of magnitude difference in the state by only managing them as groups, not as individuals. If you have millions of topics, how can a single broker keep track of everything? Dave: could part of the problem be that part of the problem is that adding the broker and growing topics, you encounter bundle splits, is the problem splitting them? Matteo: the split is a way to maintain them. If you have a bundle with too many topics, it is too large for even distribution of load. Dave: what if you could allocate a bunch of bundles at once? Michael: you can do that. Matteo: the problem there is that if you don’t need that, then it’s a waste of resources. It’s not terrible, but worse case is when you have one topic per bundle. If you know your use case, you can create the namespace with the “right” number of bundles. The system is ideally designed so that operators don’t need to configure these low level details. Dave: is that the point of the different bundle split algorithms, then? Matteo: yes. Lari: do we want to change the design at any point in the future? One case is topic level anti-affinity rules for a very high volume partitioned topic where you want to ensure each partition is on different brokers. Matteo: it is kind of possible today. If you want to maximize throughput, that should be the responsibility of the load manager. It is possible to have bundle level failure domains where bundles are split. Lari: since it is about bundles, it is extra complexity to ensure the topics are distributed because they could be in the same bundle depending on their hash. Matteo: if you have 10 brokers, you should have 100 partitions if you’re looking to spread the load across all of the brokers. Heesung: note that there are different splitting algorithms. Michael: there is not merging topic bundles, which makes splitting a “forever” event right now. Matteo: merging bundles was on the road map (like 8 years ago). It is doable, but it is complicated because it requires coordination between multiple nodes. In most cases, it’s not terrible to overshoot on the number of bundles. How can we support both scenarios, (Lari’s and the millions of topics scenario)? Lari: the design is limiting future improvements. Another use case from the mailing list related to PIP 192 broker graceful shutdown. Due to the way bundles are unloaded, there are interruptions for topic availability. If you could move topics atomically, there is a chance for a seamless handover. There could be an RPC between brokers to help smooth over issues, too. Matteo: that implies every broker knows about every topic. Lari: no, that isn’t my proposed design. Matteo: if you have ownership, then brokers need to know. In the work for 192, there is a concept of transfer. I wouldn’t want RPCs because it could become tricky with availability. Part of the PIP is to keep track of bundle placement using a topic. Since the brokers are aware of a transfer operation, broker A becomes aware that broker B is the new owner and a transfer needs to happen, which can enable a graceful transfer. Instead of cutting off the clients, you can close the bundle, transfer it, and then when B is ready, tell the clients where to connect to. I am not against getting rid of bundles, but how do we treat the case of millions of topics efficiently? Lari: the current metadata store sends all notifications to all brokers, so we need to add sharding so that not every broker gets every notification. Matteo: other part about the bundles is that in order to improve service discovery with many topic partitions or even just many topics, the lookups are individual instead of batched. This could be added to the protocol. It could be a waste of resources to do all of the lookups. The one downside is that it pushes the bundle mechanism to the client side, which is tight coupling. Michael: there are extra connections related to the many lookups too. Lari: one concept is that in streaming is that batching might not be beneficial on the protocol. Matteo: it isn’t really batching. Instead of doing some number of lookups, we do a single lookup for the bundle assignments and then the client can determine which brokers to connect to. Lari: doing it in parallel should be the same and there could be 50k. Michael: It seems like the difference isn’t so much batching, but that the broker sends the bundle ranges and assignments and then the client does the compute to figure out where to connect to based on the topics. Lari: that won’t benefit us though. Matteo: there are big benefits to batching. In the absence of seeing a better alternative to bundles, it seems appropriate to expose bundles via the protocol. It doesn’t have to be tied to bundles. Even if tomorrow you have per topic assignment, you could pass the assignments to the client and then the client knows. The point is an alternative service discovery mechanism. Asaf: what if we somehow split it into a bootstrap mode and a real mode? Then, initial messages are sent to any broker, that broker sends them to the “right” broker, while the original broker tells the client to connect to the other broker. Lari: batching is not always faster than streaming. Matteo: it is, and by that I mean less CPU intensive. Lari: I’d like to break it down how we could make it a streaming use case. Matteo: it is already streaming. Lari: I think the current approach is naive though and it could be improved. Matteo: I don’t think you can make it faster where you make a single RPC for each topic. Lari: the challenge that you’re setting is limiting. Let’s make plans where we expose the bundles and where we don’t (over the protocol). Matteo: I’d like to see a plan for how we can support millions of topics without bundles. Lari: I think we will find a solution in an alternative design. There will be sharding that would be similar to namespace bundles but it would be different. Matteo: bundles are a sharding mechanism. Lari: the assignment happens on hash ranges though. Asaf: are we talking about a low cardinality of topics? Do you just get one namespace or one bundle per topic? Lari: when we drop namespace bundles, there has to be a way to do sharding and some hashing of the topic. If we compute a hash and assign that hash to a shard, then assuming you have a fixed number of shards. Matteo: I’ll give you an example. If you have a fixed number of shards, you’ve made an arbitrary limit on the max number of brokers. Lari: the problem you described has solutions. Matteo: there is no solution if you cannot move shards around. It makes the job much more difficult to the load manager because they are cluster shards. Lari: they are not cluster shards. For reads, the shards will need to be assigned, but there could be read only replicas for a shard. Matteo: when the traffic is imbalanced, how do we handle that? Lari: there would need to be support for rehashing to increase the number of shards. Matteo: that is impossible to do the increase for the whole cluster. Bundle splitting lets you treat this as a single node problem, instead of the whole cluster. Lari: consistent hashing takes care of this. (Note: there was some additional discussion about potential designs.) Asfa: maybe you can provide an example of a system that already has the implementation you’re looking for? Andrey: I think we should enumerate the problems. Lari: if we accept that this is a step we can take (that we can drop bundles), then we can discuss this more. Matteo: I don’t see a cure for everything. There are different approaches that have different benefits and also different problems. I don’t see how we can get away from some level of grouping, and it has to be flexible because conditions are dynamic. Lari: I completely agree on that. How can we continue? Asaf: I think we need an existing system or a white paper that can show your proposed replacement? Lari: I think that is a problematic approach because there isn’t the same context. Matteo: can you write out some schema to show your design? Also, one thing is that if you have these cluster wide shards, you lose the namespace level isolation that we already have (namespace isolation policies). Lari: it wouldn’t be tied to the top level assignment. I’ll do some follow up presentation/proposal for the alternative approach. Matteo: one request, try to make it pluggable so we can migrate production systems. - Michael: https://github.com/apache/pulsar/pull/17401. Can we make it possible to get rid of expiration for values in the metadata store for certain caches? Matteo: the main reason we needed expiration is because zk watches had issues. Since using persistent watches, it is more reliable, but I don’t know if it is perfect. Michael: I just want to change it so that we can serve stale values sometimes instead of expiring the value. The refresh after write will get the right value for the next call. Matteo: it was only expire at first, then we added refresh to improve recency of the data and decrease chance for cache misses. The expire was left to ensure self correct if something got into a very bad state. We could do a longer time, too, for the expire. There is one case in which expire is needed: we are also using the cache as a negative cache, and those should be expired. Michael: wouldn’t the refresh after write take care of that though? Matteo: the refresh will still store the negative result, but if the value is not asked for, which would trigger the refresh, then you could have a very old stale value. Michael: this change can be selectively applied to the “right” caches, like the leader manager cache. Matteo: sure, that makes sense to not expire. - Asaf: there is a new expansion of the protobuf, it feels like there is incorrect encapsulation. It looks like some methods are only public so the broker can use them. Can we break it out so we have a private API that is available to the broker run time and one that is public? Dave: can we just document it? Matteo: I don’t view the protobuf as a public API. Dave: should we have naming conventions to improve this? Matteo: good point, and we can change all of those fields. Having two APIs would be a burden. There are some short cuts in the broker for geo-replication. Asaf: it bothered me that we meshed together things that are internal with the public interface that is the spec of the system. It is weird to me that the replication fields are part of the public fields. For new fields, I think we should think about these. Matteo: in general, there has been no convention, we have done the easiest approach, which might not always be the best idea? Lari: as an example, for a new comer to the protocol, the authoritative flag is a good example. Matteo: we think we can get rid of it, at least from a perspective of never returning a redirect. If we always give an authoritative answer, we don’t need to use it. Asaf: are the trade offs worth it? Matteo: you could use the marker for a replicated subscription. This marks that the message is server generated, so it is only meant for broker to broker communication. I don’t see an easy way to hide that by saying a client library doesn’t have access to it.