Hi Pulsar Community,

Here are the meeting notes from today's community meeting. Thanks to
all who participated!

As you'll see, we had a very long discussion about the idea of
replacing bundles. The other two topics were on broker metadata cache
evictions and the pulsar protocol.

Disclaimer: if something is misattributed or misrepresented, please
send a correction to this list.

Source google doc:
https://docs.google.com/document/d/19dXkVXeU2q_nHmkG8zURjKnYlvD96TbKf5KjYyASsOE

Thanks,
Michael

2022/09/01, (8:30 AM PST)
-   Attendees:
-   Matteo Merli
-   Lari Hotari
-   Enrico Olivelli
-   Andrey Yegorov
-   Dave Fisher
-   Heesung Sohn
-   Ayman Khalil
-   Michael Marshall
-   Asaf Mesika
-   Christophe Bornet

-   Discussions/PIPs/PRs (Generally discussed in order they appear)

-   Lari: the mailing list discussion about namespace bundles and the
long term plan. Matteo: I would like to remove them. They can be very
annoying. They have to be big enough to be working, and small enough
to not cause imbalance problems. There is an order of magnitude
difference in the state by only managing them as groups, not as
individuals. If you have millions of topics, how can a single broker
keep track of everything? Dave: could part of the problem be that part
of the problem is that adding the broker and growing topics, you
encounter bundle splits, is the problem splitting them? Matteo: the
split is a way to maintain them. If you have a bundle with too many
topics, it is too large for even distribution of load. Dave: what if
you could allocate a bunch of bundles at once? Michael: you can do
that. Matteo: the problem there is that if you don’t need that, then
it’s a waste of resources. It’s not terrible, but worse case is when
you have one topic per bundle. If you know your use case, you can
create the namespace with the “right” number of bundles. The system is
ideally designed so that operators don’t need to configure these low
level details. Dave: is that the point of the different bundle split
algorithms, then? Matteo: yes. Lari: do we want to change the design
at any point in the future? One case is topic level anti-affinity
rules for a very high volume partitioned topic where you want to
ensure each partition is on different brokers. Matteo: it is kind of
possible today. If you want to maximize throughput, that should be the
responsibility of the load manager. It is possible to have bundle
level failure domains where bundles are split. Lari: since it is about
bundles, it is extra complexity to ensure the topics are distributed
because they could be in the same bundle depending on their hash.
Matteo: if you have 10 brokers, you should have 100 partitions if
you’re looking to spread the load across all of the brokers. Heesung:
note that there are different splitting algorithms. Michael: there is
not merging topic bundles, which makes splitting a “forever” event
right now. Matteo: merging bundles was on the road map (like 8 years
ago). It is doable, but it is complicated because it requires
coordination between multiple nodes. In most cases, it’s not terrible
to overshoot on the number of bundles. How can we support both
scenarios, (Lari’s and the millions of topics scenario)? Lari: the
design is limiting future improvements. Another use case from the
mailing list related to PIP 192 broker graceful shutdown. Due to the
way bundles are unloaded, there are interruptions for topic
availability. If you could move topics atomically, there is a chance
for a seamless handover. There could be an RPC between brokers to help
smooth over issues, too. Matteo: that implies every broker knows about
every topic. Lari: no, that isn’t my proposed design. Matteo: if you
have ownership, then brokers need to know. In the work for 192, there
is a concept of transfer. I wouldn’t want RPCs because it could become
tricky with availability. Part of the PIP is to keep track of bundle
placement using a topic. Since the brokers are aware of a transfer
operation, broker A becomes aware that broker B is the new owner and a
transfer needs to happen, which can enable a graceful transfer.
Instead of cutting off the clients, you can close the bundle, transfer
it, and then when B is ready, tell the clients where to connect to. I
am not against getting rid of bundles, but how do we treat the case of
millions of topics efficiently? Lari: the current metadata store sends
all notifications to all brokers, so we need to add sharding so that
not every broker gets every notification. Matteo: other part about the
bundles is that in order to improve service discovery with many topic
partitions or even just many topics, the lookups are individual
instead of batched. This could be added to the protocol. It could be a
waste of resources to do all of the lookups. The one downside is that
it pushes the bundle mechanism to the client side, which is tight
coupling. Michael: there are extra connections related to the many
lookups too. Lari: one concept is that in streaming is that batching
might not be beneficial on the protocol. Matteo: it isn’t really
batching. Instead of doing some number of lookups, we do a single
lookup for the bundle assignments and then the client can determine
which brokers to connect to. Lari: doing it in parallel should be the
same and there could be 50k. Michael: It seems like the difference
isn’t so much batching, but that the broker sends the bundle ranges
and assignments and then the client does the compute to figure out
where to connect to based on the topics. Lari: that won’t benefit us
though. Matteo: there are big benefits to batching. In the absence of
seeing a better alternative to bundles, it seems appropriate to expose
bundles via the protocol. It doesn’t have to be tied to bundles. Even
if tomorrow you have per topic assignment, you could pass the
assignments to the client and then the client knows. The point is an
alternative service discovery mechanism. Asaf: what if we somehow
split it into a bootstrap mode and a real mode? Then, initial messages
are sent to any broker, that broker sends them to the “right” broker,
while the original broker tells the client to connect to the other
broker. Lari: batching is not always faster than streaming. Matteo: it
is, and by that I mean less CPU intensive. Lari: I’d like to break it
down how we could make it a streaming use case. Matteo: it is already
streaming. Lari: I think the current approach is naive though and it
could be improved. Matteo: I don’t think you can make it faster where
you make a single RPC for each topic. Lari: the challenge that you’re
setting is limiting. Let’s make plans where we expose the bundles and
where we don’t (over the protocol). Matteo: I’d like to see a plan for
how we can support millions of topics without bundles. Lari: I think
we will find a solution in an alternative design. There will be
sharding that would be similar to namespace bundles but it would be
different. Matteo: bundles are a sharding mechanism. Lari: the
assignment happens on hash ranges though. Asaf: are we talking about a
low cardinality of topics? Do you just get one namespace or one bundle
per topic? Lari: when we drop namespace bundles, there has to be a way
to do sharding and some hashing of the topic. If we compute a hash and
assign that hash to a shard, then assuming you have a fixed number of
shards. Matteo: I’ll give you an example. If you have a fixed number
of shards, you’ve made an arbitrary limit on the max number of
brokers. Lari: the problem you described has solutions. Matteo: there
is no solution if you cannot move shards around. It makes the job much
more difficult to the load manager because they are cluster shards.
Lari: they are not cluster shards. For reads, the shards will need to
be assigned, but there could be read only replicas for a shard.
Matteo: when the traffic is imbalanced, how do we handle that? Lari:
there would need to be support for rehashing to increase the number of
shards. Matteo: that is impossible to do the increase for the whole
cluster. Bundle splitting lets you treat this as a single node
problem, instead of the whole cluster. Lari: consistent hashing takes
care of this. (Note: there was some additional discussion about
potential designs.) Asfa: maybe you can provide an example of a system
that already has the implementation you’re looking for? Andrey: I
think we should enumerate the problems. Lari: if we accept that this
is a step we can take (that we can drop bundles), then we can discuss
this more. Matteo: I don’t see a cure for everything. There are
different approaches that have different benefits and also different
problems. I don’t see how we can get away from some level of grouping,
and it has to be flexible because conditions are dynamic. Lari: I
completely agree on that. How can we continue? Asaf: I think we need
an existing system or a white paper that can show your proposed
replacement? Lari: I think that is a problematic approach because
there isn’t the same context. Matteo: can you write out some schema to
show your design? Also, one thing is that if you have these cluster
wide shards, you lose the namespace level isolation that we already
have (namespace isolation policies). Lari: it wouldn’t be tied to the
top level assignment. I’ll do some follow up presentation/proposal for
the alternative approach. Matteo: one request, try to make it
pluggable so we can migrate production systems.

-   Michael: https://github.com/apache/pulsar/pull/17401. Can we make
it possible to get rid of expiration for values in the metadata store
for certain caches? Matteo: the main reason we needed expiration is
because zk watches had issues. Since using persistent watches, it is
more reliable, but I don’t know if it is perfect. Michael: I just want
to change it so that we can serve stale values sometimes instead of
expiring the value. The refresh after write will get the right value
for the next call. Matteo: it was only expire at first, then we added
refresh to improve recency of the data and decrease chance for cache
misses. The expire was left to ensure self correct if something got
into a very bad state. We could do a longer time, too, for the expire.
There is one case in which expire is needed: we are also using the
cache as a negative cache, and those should be expired. Michael:
wouldn’t the refresh after write take care of that though? Matteo: the
refresh will still store the negative result, but if the value is not
asked for, which would trigger the refresh, then you could have a very
old stale value. Michael: this change can be selectively applied to
the “right” caches, like the leader manager cache. Matteo: sure, that
makes sense to not expire.

-   Asaf: there is a new expansion of the protobuf, it feels like
there is incorrect encapsulation. It looks like some methods are only
public so the broker can use them. Can we break it out so we have a
private API that is available to the broker run time and one that is
public? Dave: can we just document it? Matteo: I don’t view the
protobuf as a public API. Dave: should we have naming conventions to
improve this? Matteo: good point, and we can change all of those
fields. Having two APIs would be a burden. There are some short cuts
in the broker for geo-replication. Asaf: it bothered me that we meshed
together things that are internal with the public interface that is
the spec of the system. It is weird to me that the replication fields
are part of the public fields. For new fields, I think we should think
about these. Matteo: in general, there has been no convention, we have
done the easiest approach, which might not always be the best idea?
Lari: as an example, for a new comer to the protocol, the
authoritative flag is a good example. Matteo: we think we can get rid
of it, at least from a perspective of never returning a redirect. If
we always give an authoritative answer, we don’t need to use it. Asaf:
are the trade offs worth it? Matteo: you could use the marker for a
replicated subscription. This marks that the message is server
generated, so it is only meant for broker to broker communication. I
don’t see an easy way to hide that by saying a client library doesn’t
have access to it.

Reply via email to