Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

2022-07-21 Thread Enrico Olivelli
Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
 ha scritto:
>
> > What if the topic owner creates an internal subscription, consumes the
> messages, and updates a count per filter.
>
> I agree with this approach. If we need to scan all the backlogs to
> calculate the
> accurate backlogs for each operation, it's so expensive and difficult to
> apply to
> the production environment. With the counter for each filter(subscription)
> and only
> re-scan the data after the filter changes will reduce a lot of overhead.

This approach is very expensive when you don't need this feature.
Because in order to have this value you have to read everything (once
per subscription),
especially when you have a subscription without consumers and the
topic is being written.

As you can't know when you will need this value, you are going to
waste resources in order to calculate
something that you won't use.


This API to analyze a subscription is not meant to be used often, but
only to debug (manually) problems.

Also, the filter may be dependent on some other environment variables,
like the wall clock time,
if you have a filter that depends on time and you pre-calculate the
backlog your counter won't be correct.

>
> If we want to expose the accurate backlogs in the Prometheus endpoint,
> it's almost impossible.

I don't think this is actually possible if you want to take into
consideration the filters.
We are in the case of general purpose filtering (actually we allow
Java code to be plugged into the browser),
so pre-calculating the counters won't work well.


Enrico

>
> Thanks,
> Penghui
>
> On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika  wrote:
>
> > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli 
> > wrote:
> >
> > > Asaf,
> > >
> > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > >  ha scritto:
> > > >
> > > > I'm not sure I understand the context exactly:
> > > >
> > > > You say today we can only know the number of entries, hence we'll have
> > a
> > > > wrong number of backlog for subscription since:
> > > > 1. One entry contains multiple messages (batch message)
> > > > 2. Subscription may contain a filter, which requires you to read the
> > > entire
> > > > backlog to know it
> > >
> > > correct
> > >
> > > >
> > > > There are two things I don't understand:
> > > >
> > > > 1. We're adding an observability API, which you need to pay all the
> > read
> > > > cost just to know the count. I presume people would want to run this
> > more
> > > > than once. So they will read same data multiple times - why would a
> > user
> > > be
> > > > willing to pay such a hefty price?
> > >
> > > sometimes it is the case, because processing a message may have a high
> > > cost.
> > > So having 10 entries of 100 messages is not correctly representing the
> > > amount of work that must be done by the consumers
> > > and so the user may wish to have an exact count.
> > >
> > > Having the filters adds more complexity because you cannot predict how
> > > many entries will be filtered out
> > >
> > >
> > > So it's mainly serving that specific use case of reading the entire
> > messages over and over (every interval) is an order of magnitude less
> > expensive than the processing it self.
> >
> >
> > > > 2. If the user needs to know an accurate backlog, can't they use the
> > > > ability to create a very large number of topics, thus they will know an
> > > > accurate backlog without the huge cost?
> > >
> > > I can't understand why creating many topics will help.
> > > instead with filters it is very likely that you have only fewer topics
> > > with many subscriptions with different filters
> > >
> > > as you don't know the filters while writing you cannot route the
> > > messages to some topic
> > > also you would need to write the message to potentially multiple
> > > topics, and that would be a huge write amplification
> > > (think about a topic with 100 subscriptions)
> > >
> > > Yes, I haven't thought about that.
> > What I was thinking is that those filters are mutually exclusive therefor
> > topics, but in your case, if you have 100 different filters, and they
> > overlap, yes it would be way more expensive to write them 100 times.
> >
> > >
> > > > I have an idea, if that's ok:
> > > >
> > > > What if you can keep, as you said in your document, a metric counting
> > > > messages per filter upon write.
> > > This is not possible as described above
> > >
> >
> > You wrote above that:
> >
> > ---
> > you cannot know which subscriptions will be created in a topic
> > subscription can be created from the past (Earliest)
> > subscription filters may change over time: they are usually configured
> > using Subscription Properties, and those properties are dynamic
> > doing computations on the write path (like running filters) kills
> > latency and thoughtput
> >
> > Use a client to clone the subscription and consume data.
> > This doesn't work because you have to transfer the data to the client,
> > and this is possibly a huge a

[DISCUSS] Apache Pulsar 2.8.4 release

2022-07-21 Thread Yunze Xu
Hello Pulsar Community,

It has been several months since the 2.8.3 release and there are many
important fixes after that. For example, there is a performance
regression for message listener that was introduced from 2.8.3 and
fixed in https://github.com/apache/pulsar/pull/15162.

I volunteer to be the release manager for 2.8.4.

Here [0] you can find the list of 149 commits to branch-2.8 since the
2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
yet been cherry-picked [1] and I will cherry-pick them and solve the
conflicts if necessary. There are 18 open PRs labeled with
`release/2.8.4` [2]. I will check the status of them and see if they
can be pushed to 2.8.4.

Thanks,
Yunze

[0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
[1] - 
https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
[2] - 
https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen



Re: [DISCUSS] PIP-191: Support batched message using entry filter

2022-07-21 Thread Qiang Huang
Good. +1

Anon Hxy  于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at
> the batch header,  without digging into the individual messages properties.
> Of course it provides an interface to deserialize the entire Entry to the
> heap,  But this will bring some memory and cpu workload. And in most
> scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry without
> having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List` with default value is
> empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl msg, SendCallback callback) {
>
> if (log.isDebugEnabled()) {
> log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
> numMessagesInBatch);
> }
>
> if (++numMessagesInBatch == 1) {
> try {
> // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
> // the first message
> messageMetadata.setSequenceId(msg.getSequenceId());
> List filterProperties = getProperties(msg);
> if (!filterProperties.isEmpty()) {
> messageMetadata.addAllProperties(filterProperties);  //
> and message properties here
> }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl msg) {
> return batchMessageContainer.haveEnoughSpace(msg)
>&& (!isMultiSchemaEnabled(false)
> || batchMessageContainer.hasSameSchema(msg)
> || batchMessageContainer.hasSameProperties(msg))
> && batchMessageContainer.hasSameTxn(msg);
> }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>


-- 
BR,
Qiang Huang


Re: [DISCUSS] PIP-191: Support batched message using entry filter

2022-07-21 Thread Enrico Olivelli
Thank you for this proposal !

I understand the problem, and I have already thought about it, because
I am the author of some filters (especially the JMS Selectors Filter)
but we have to clarify more about this PIP.

1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.
2) we have to clarify the new message format of the entry and update
the protocol documents
3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage
4) the same problem applies to Interceptors and Protocol Handlers
(like KOP), we must make it clear in this PIP what is the upgrade path
and give some suggestions to the developers of such components

I am supporting this initiative, as far as we clarify those points.

The Pulsar ecosystem is becoming more and more mature, and the user
base is growing quickly, we can't break things without a clear path
for the users and for the developers of extensions

Enrico


Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
 ha scritto:
>
> Good. +1
>
> Anon Hxy  于2022年7月19日周二 18:00写道:
>
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Support batched message using entry filter"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > ---
> >
> > ## Motivation
> >
> > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > https://github.com/apache/pulsar/issues/12269.  But this way only checks
> > at
> > the batch header,  without digging into the individual messages properties.
> > Of course it provides an interface to deserialize the entire Entry to the
> > heap,  But this will bring some memory and cpu workload. And in most
> > scenarios we only need partial properties to do some filter.
> >
> > This proposal brings a method to make PIP-105 support batched entry without
> > having to deserialize the entire Entry to the heap
> >
> >
> > ## API Changes
> >
> > - Add a producer config to specialize the key, of which properties will be
> > added to the batched entry metadata, for example:
> >
> > ```
> >
> > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> >
> > ```
> > The  `batchedFilterProperties` type is `List` with default value is
> > empty list.  For an empty list, it means that the properties of entry's
> > metadata are empty, and the `EntryFilter` will not take effect.
> >
> > ## Implementation
> >
> > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> >  we extract the message properties and add it to `metadata`:
> > ```
> >  public boolean add(MessageImpl msg, SendCallback callback) {
> >
> > if (log.isDebugEnabled()) {
> > log.debug("[{}] [{}] add message to batch, num messages in
> > batch so far {}", topicName, producerName,
> > numMessagesInBatch);
> > }
> >
> > if (++numMessagesInBatch == 1) {
> > try {
> > // some properties are common amongst the different
> > messages in the batch, hence we just pick it up from
> > // the first message
> > messageMetadata.setSequenceId(msg.getSequenceId());
> > List filterProperties = getProperties(msg);
> > if (!filterProperties.isEmpty()) {
> > messageMetadata.addAllProperties(filterProperties);  //
> > and message properties here
> > }
> > ```
> >
> > -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> > Messages with same properties can be added to the same batch:
> >
> > ```
> >  private boolean canAddToCurrentBatch(MessageImpl msg) {
> > return batchMessageContainer.haveEnoughSpace(msg)
> >&& (!isMultiSchemaEnabled(false)
> > || batchMessageContainer.hasSameSchema(msg)
> > || batchMessageContainer.hasSameProperties(msg))
> > && batchMessageContainer.hasSameTxn(msg);
> > }
> >
> > ```
> >
> >
> > ## Reject Alternatives
> >
> > - Implement a `AbstractBatchMessageContainer` ,  saying
> > `BatchMessagePropertiesBasedContainer`, keeping messages with same
> > properties in a single `hashmap` entry,  like
> > `BatchMessageKeyBasedContainer`.
> >
> > Rejection reason:  This will publish messages out of order
> >
> >
> >
> > Thanks,
> > Xiaoyu Hou
> >
>
>
> --
> BR,
> Qiang Huang


Re: [DISCUSS] Apache Pulsar 2.8.4 release

2022-07-21 Thread Dave Fisher
Thank you for volunteering!

> On Jul 21, 2022, at 12:57 AM, Yunze Xu  wrote:
> 
> Hello Pulsar Community,
> 
> It has been several months since the 2.8.3 release and there are many
> important fixes after that. For example, there is a performance
> regression for message listener that was introduced from 2.8.3 and
> fixed in https://github.com/apache/pulsar/pull/15162.
> 
> I volunteer to be the release manager for 2.8.4.
> 
> Here [0] you can find the list of 149 commits to branch-2.8 since the
> 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
> yet been cherry-picked [1] and I will cherry-pick them and solve the
> conflicts if necessary. There are 18 open PRs labeled with
> `release/2.8.4` [2]. I will check the status of them and see if they
> can be pushed to 2.8.4.

Please use discretion about selecting bug fixes only and avoid adding new 
features or unexpected configuration changes. Thanks!

Ask for help with the cherry picks since being in a Release Manager is a big 
job!

Regards,
Dave


> 
> Thanks,
> Yunze
> 
> [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
> [1] - 
> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
> [2] - 
> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
> 



Re: [DISCUSS] Apache Pulsar 2.11.0 Release

2022-07-21 Thread Nicolò Boschi
I understand the need for the Pulsar Summit.

In that case I have to step back because I will be offline for the next few
weeks
Sorry

Nicolò Boschi

Il Gio 21 Lug 2022, 06:32 PengHui Li  ha scritto:

> Thanks for volunteering Nicolò.
>
> > So a plan could be to try to merge the work in progress targeted for 2.11
> by the mid of August and then start the code freezing as described in the
> PIP.
>
> So the target release date will be early September. One point is Pulsar
> Summit
> San Francisco will start on August 18, 2022. I think maybe we can start to
> test
> the master branch for now and continue the in-progress tasks. If we can
> have a
> major release before Pulsar Summit, it should be good news to the
> Community.
>
> Thanks.
> Penghui
>
> On Mon, Jul 18, 2022 at 4:06 PM Enrico Olivelli 
> wrote:
>
> > Nicolò,
> >
> > Il Lun 18 Lug 2022, 10:00 Nicolò Boschi  ha
> scritto:
> >
> > > Thanks Penghui for the reminder.
> > > I'd like to also include PIP: 181 Pulsar shell if the time permits.
> > >
> > > I believe that is a good idea to start testing the code freeze proposed
> > by
> > > PIP-175 (https://github.com/apache/pulsar/issues/15966). Even if not
> > > officially approved, we discussed it many times and agreed to the
> > > usefulness of the code freezing.
> > >
> >
> > Great idea!
> >
> > We should really try it
> >
> > So a plan could be to try to merge the work in progress targeted for 2.11
> > > by the mid of August and then start the code freezing as described in
> the
> > > PIP.
> > >
> > > Also, I volunteer for driving the release if nobody else is interested
> > >
> >
> >
> > Thanks for volunteering
> >
> > Enrico
> >
> >
> > > Thanks,
> > > Nicolò Boschi
> > >
> > >
> > > Il giorno lun 18 lug 2022 alle ore 06:59 Yunze Xu
> > >  ha scritto:
> > >
> > > > In addition to #16202, there is a following PR to support the correct
> > > > ACK implementation for chunked messages. It should depend on #16202
> > > > But I think I can submit an initial PR this week and change the tests
> > > > after #16202 is merged.
> > > >
> > > > Thanks,
> > > > Yunze
> > > >
> > > >
> > > >
> > > >
> > > > > 2022年7月18日 11:22,PengHui Li  写道:
> > > > >
> > > > > Hi all,
> > > > >
> > > > > We released 2.10.0 three months ago. And there are many great
> changes
> > > in
> > > > > the master branch,
> > > > > including new features and performance improvements.
> > > > >
> > > > > - PIP 74: apply client memory to consumer
> > > > > https://github.com/apache/pulsar/pull/15216
> > > > > - PIP 143: Support split bundles by specified boundaries
> > > > > https://github.com/apache/pulsar/pull/13796
> > > > > - PIP 145: regex subscription improvements
> > > > > https://github.com/apache/pulsar/pull/16062
> > > > > - PIP 160: transaction performance improvements (still in progress
> > and
> > > > > merged some PRs)
> > > > > - PIP 161: new exclusive producer mode support
> > > > > https://github.com/apache/pulsar/pull/15488
> > > > > - PIP 182: Provide new load balance placement strategy
> implementation
> > > for
> > > > > ModularLoadManagerStrategy
> > https://github.com/apache/pulsar/pull/16281
> > > > > Add Pulsar Auth support for the Pulsar SQL
> > > > > https://github.com/apache/pulsar/pull/15571
> > > > >
> > > > > And some features are blocked in the review stage, but they are
> > > powerful
> > > > > improvements for Pulsar
> > > > >
> > > > > PIP 37: Support chunking with Shared subscription
> > > > > https://github.com/apache/pulsar/pull/16202
> > > > > PIP-166: Function add MANUAL delivery semantics
> > > > > https://github.com/apache/pulsar/pull/16279
> > > > >
> > > > > You can find the complete change list in 2.11.0 at
> > > > >
> > > >
> > >
> >
> https://github.com/apache/pulsar/pulls?q=is%3Apr+milestone%3A2.11.0+-label%3Arelease%2F2.10.1+-label%3Arelease%2F2.10.2
> > > > >
> > > > > And maybe I missed some important in-progress PRs, please let me
> know
> > > if
> > > > it
> > > > > should be a blocker of the 2.11.0 release.
> > > > >
> > > > > It's a good time to discuss the target time of the 2.11.0 release.
> > > > > I think we can leave 2 weeks to complete the in-progress PRs and 2
> > > weeks
> > > > to
> > > > > accept bug fixes.
> > > > > And target the 2.11.0 release in mid-August.
> > > > >
> > > > > Please let me know what you think.
> > > > >
> > > > > Thanks,
> > > > > Penghui
> > > >
> > > >
> > >
> >
>


Re: [DISCUSS] Apache Pulsar 2.8.4 release

2022-07-21 Thread Enrico Olivelli
Thank you Yunze

I agree that it is time to cut a release of 2.8.

We should also one last release out of 2.7 branch.
I hope that someone will volunteer

Enrico

Il Gio 21 Lug 2022, 18:10 Dave Fisher  ha scritto:

> Thank you for volunteering!
>
> > On Jul 21, 2022, at 12:57 AM, Yunze Xu 
> wrote:
> >
> > Hello Pulsar Community,
> >
> > It has been several months since the 2.8.3 release and there are many
> > important fixes after that. For example, there is a performance
> > regression for message listener that was introduced from 2.8.3 and
> > fixed in https://github.com/apache/pulsar/pull/15162.
> >
> > I volunteer to be the release manager for 2.8.4.
> >
> > Here [0] you can find the list of 149 commits to branch-2.8 since the
> > 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
> > yet been cherry-picked [1] and I will cherry-pick them and solve the
> > conflicts if necessary. There are 18 open PRs labeled with
> > `release/2.8.4` [2]. I will check the status of them and see if they
> > can be pushed to 2.8.4.
>
> Please use discretion about selecting bug fixes only and avoid adding new
> features or unexpected configuration changes. Thanks!
>
> Ask for help with the cherry picks since being in a Release Manager is a
> big job!
>
> Regards,
> Dave
>
>
> >
> > Thanks,
> > Yunze
> >
> > [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
> > [1] -
> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
> > [2] -
> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
> >
>
>


[VOTE] PIP-174: Provide new implementation for broker dispatch cache

2022-07-21 Thread Matteo Merli
## Motivation

The current implementation of the read cache in the Pulsar broker has largely
remained unchanged for a long time, except for a few minor tweaks.

While the implementation is stable and reasonably efficient for
typical workloads,
the overhead required for managing the cache evictions in a broker
that is running
many topics can be pretty high in terms of extra CPU utilization and on the JVM
garbage collection to track an increased number of medium-lived objects.

The goal is to provide an alternative implementation that can adapt better to
a wider variety of operating conditions.

### Current implementation details

The broker cache is implemented as part of the `ManagedLedger` component,
which sits in the Pulsar broker and provides a higher level of
abstraction of top
of BookKeeper.

Each topic (and managed-ledger) has its own private cache space. This
cache is implemented
as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) ->
payload`. The payload
is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got
when reading from a socket, or it can be a copied buffer.

Each topic cache is allowed to use the full broker max cache size before an
eviction is triggered. The total cache size is effectively a resource
shared across all
the topics, where a topic can use a more prominent portion of it if it
"asks for more".

When the eviction happens, we need to do an expensive ranking of all
the caches in the broker
and do an eviction in a proportional way to the currently used space
for each of them.

The bigger problem is represented by the `ConcurrentSkipList` and the
`ByteBuf` objects
that need to be tracked. The skip list is essentially like a "tree"
structure and needs to
maintain Java objects for each entry in the cache. We also need to
potentially have
a huge number of ByteBuf objects.

A cache workload is typically the worst-case scenario for each garbage
collector implementation because it involves creating objects, storing
them for some amount of
time and then throwing them away. During that time, the GC would have
already tenured these
objects and copy them into an "old generation" space, and sometime
later, a costly compaction
of that memory would have to be performed.

To mitigate the effect of the cache workload on the GC, we're being
very aggressive in
purging the cache by triggering time-based eviction. By putting a max
TTL on the elements in
the cache, we can avoid keeping the objects around for too long to be
a problem for the GC.

The reverse side of this is that we're artificially reducing the cache
capacity to a very
short time frame, reducing the cache usefulness.

The other problem is the CPU cost involved in doing these frequent
evictions, which can
be very high when there are 10s of thousands of topics in a broker.


## Proposed changes

Instead of dealing with individual caches for each topic, let's adopt
a model where
there is a single cache space for the broker.

This cache is broken into N segments which act as a circular buffer.
Whenever a segment
is full, we start writing into the next one, and when we reach the
last one, we will
restart recycling the first segment.

This model has been working very well for the BookKeeper `ReadCache`:
https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java

The eviction becomes a completely trivial operation, buffers are just
rotated and
overwritten. We don't need to do any per-topic task or keep track of
utilization.

Today, there are 2 ways of configuring the cache, one that "copies"
data into the cache
and another that will just use reference-counting on the original
buffers to avoid
payload copies.

### Memory copies into the cache

Each segment is composed of a buffer, an offset, and a hashmap which maps
`(ledgerId, entryId) -> offset`.


The advantage of this approach is that entries are copied into the cache buffer
(in direct memory), and we don't need to keep any long-lived Java objects around

### Keeping reference-counted buffers in the cache

Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`.
Buffers will have an increase reference count that will keep the data
alive as long
as the buffer is in the cache and it will be released when the cache
segment is rotated.

The advantage is we avoid any memory copy when inserting into or
reading from the cache.
The disadvantage is that we will have references to all the `ByteBuf`
objects that are in the cache.

### API changes

No user-facing API changes are required.

### New configuration options

The existing cache implementation will not be removed at this point. Users will
be able to configure the old implementation in `broker.conf`.

This option will be helpful in case of performance regressions would be seen for
some use cases with the new cache implementation.



--
Matteo Merli



Pulsar Flaky test report 2022-07-19 for PR builds in CI

2022-07-21 Thread Nicolò Boschi
Dear community,

Here's a report of the flaky tests in Pulsar CI during the observation
period of 2022-07-12 - 2022-07-19

https://docs.google.com/spreadsheets/d/165FHpHjs5fHccSsmQM4beeg6brn-zfUjcrXf6xAu4yQ/edit?usp=sharing

The report contains a subset of the test failures.
The flaky tests are observed from builds of merged PRs.
The GitHub Actions logs will be checked for builds where the SHA of the
head of the PR matches the SHA which got merged.
This ensures that all found exceptions are real flakes, since no changes
were made to the PR to make the tests pass later
so that the PR was merged successfully.



Top 5 flaky issues to fix:
org.apache.pulsar.tests.integration.functions.java.PulsarFunctionsJavaThreadTest.testJavaLoggingFunction
92
org.apache.pulsar.functions.worker.PulsarFunctionTlsTest.tearDown 36
org.apache.pulsar.broker.service.persistent.PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.testDispatchRateCompatibility2
27
org.apache.pulsar.broker.service.persistent.PersistentDispatcherFailoverConsumerStreamingDispatcherTest.testAddRemoveConsumer
23
org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.testAddRemoveConsumer
22


Markdown formatted summary reports for each test class can be accessed at
https://github.com/nicoloboschi/pulsar-flakes/tree/master/2022-07-12-to-2022-07-19


More flaky test issues:
https://github.com/apache/pulsar/issues?q=flaky+sort%3Aupdated-desc+is%3Aopen+is:issue

We need more help in addressing the flaky tests. Please join the efforts
so that we can get CI to a more stable state.

To coordinate the work,
1) please search for an existing issues or search for all flaky issues with
"flaky" or the test class name (without package) in the search:
https://github.com/apache/pulsar/issues?q=is%3Aopen+flaky
+sort%3Aupdated-desc
2) If there isn't an issue for a particular flaky test failure that you'd
like to fix, please create an issue using the "Flaky test" template at
https://github.com/apache/pulsar/issues/new/choose
3) Please comment on the issue that you are working on.

We have a few active contributors working on the flaky tests, thanks for
the contributions.

I'm looking forward to more contributors joining the efforts. Please join
the #testing channel on Slack if you'd like to ask questions and tips about
reproducing flaky tests locally and how to fix them.
Sharing stories about fixing flaky tests is also helpful for sharing the
knowledge about how flaky tests can be fixed. That's also a valuable way to
contribute.
Some flaky tests might be actual real production code bugs. Fixing
the flaky test might result in fixing a real production code bug.

More contributions are welcome! Please keep up the good work!

Thanks,
Nicolò Boschi


Re: [DISCUSS] PIP-192 New Pulsar Broker Load Balancer

2022-07-21 Thread Heesung Sohn
Hi Rajan,

As discussed in the Pulsar Community Meeting today, I added the
"Modification Summary" section that compares the current solutions and
our proposals.

Regards,
Heesung

On Wed, Jul 20, 2022 at 4:24 PM Matteo Merli  wrote:
>
> On Wed, Jul 20, 2022 at 1:37 PM Rajan Dhabalia  wrote:
>
> > Also, here we have to clearly document that it will not impact behavior of
> > existing load-balancers now or in future. Pulsar is used by many companies
> > and orgs so, deprecating and not maintaining existing components is not
> > acceptable in any circumstances.
>
> This is exactly the reason why this is going to be implemented as a
> new loadmanager instead of improving the existing
> ModularLoadManagerImpl.
>
> It gives the flexibility to start fresh without the existing baggage
> of choices and try a significantly different approach.
>
> The current ModularLoadManagerImpl will not go away. Once the new load
> manager will be ready and considered stable enough, there might be a
> new discussion on whether to change the default implementation. Even
> then, users will still be able to opt for the old load manager.


Re: [VOTE] PIP-174: Provide new implementation for broker dispatch cache

2022-07-21 Thread PengHui Li
+1

Penghui Li
On Jul 22, 2022, 02:37 +0800, Matteo Merli , wrote:
> ## Motivation
>
> The current implementation of the read cache in the Pulsar broker has largely
> remained unchanged for a long time, except for a few minor tweaks.
>
> While the implementation is stable and reasonably efficient for
> typical workloads,
> the overhead required for managing the cache evictions in a broker
> that is running
> many topics can be pretty high in terms of extra CPU utilization and on the 
> JVM
> garbage collection to track an increased number of medium-lived objects.
>
> The goal is to provide an alternative implementation that can adapt better to
> a wider variety of operating conditions.
>
> ### Current implementation details
>
> The broker cache is implemented as part of the `ManagedLedger` component,
> which sits in the Pulsar broker and provides a higher level of
> abstraction of top
> of BookKeeper.
>
> Each topic (and managed-ledger) has its own private cache space. This
> cache is implemented
> as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) ->
> payload`. The payload
> is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got
> when reading from a socket, or it can be a copied buffer.
>
> Each topic cache is allowed to use the full broker max cache size before an
> eviction is triggered. The total cache size is effectively a resource
> shared across all
> the topics, where a topic can use a more prominent portion of it if it
> "asks for more".
>
> When the eviction happens, we need to do an expensive ranking of all
> the caches in the broker
> and do an eviction in a proportional way to the currently used space
> for each of them.
>
> The bigger problem is represented by the `ConcurrentSkipList` and the
> `ByteBuf` objects
> that need to be tracked. The skip list is essentially like a "tree"
> structure and needs to
> maintain Java objects for each entry in the cache. We also need to
> potentially have
> a huge number of ByteBuf objects.
>
> A cache workload is typically the worst-case scenario for each garbage
> collector implementation because it involves creating objects, storing
> them for some amount of
> time and then throwing them away. During that time, the GC would have
> already tenured these
> objects and copy them into an "old generation" space, and sometime
> later, a costly compaction
> of that memory would have to be performed.
>
> To mitigate the effect of the cache workload on the GC, we're being
> very aggressive in
> purging the cache by triggering time-based eviction. By putting a max
> TTL on the elements in
> the cache, we can avoid keeping the objects around for too long to be
> a problem for the GC.
>
> The reverse side of this is that we're artificially reducing the cache
> capacity to a very
> short time frame, reducing the cache usefulness.
>
> The other problem is the CPU cost involved in doing these frequent
> evictions, which can
> be very high when there are 10s of thousands of topics in a broker.
>
>
> ## Proposed changes
>
> Instead of dealing with individual caches for each topic, let's adopt
> a model where
> there is a single cache space for the broker.
>
> This cache is broken into N segments which act as a circular buffer.
> Whenever a segment
> is full, we start writing into the next one, and when we reach the
> last one, we will
> restart recycling the first segment.
>
> This model has been working very well for the BookKeeper `ReadCache`:
> https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
>
> The eviction becomes a completely trivial operation, buffers are just
> rotated and
> overwritten. We don't need to do any per-topic task or keep track of
> utilization.
>
> Today, there are 2 ways of configuring the cache, one that "copies"
> data into the cache
> and another that will just use reference-counting on the original
> buffers to avoid
> payload copies.
>
> ### Memory copies into the cache
>
> Each segment is composed of a buffer, an offset, and a hashmap which maps
> `(ledgerId, entryId) -> offset`.
>
>
> The advantage of this approach is that entries are copied into the cache 
> buffer
> (in direct memory), and we don't need to keep any long-lived Java objects 
> around
>
> ### Keeping reference-counted buffers in the cache
>
> Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`.
> Buffers will have an increase reference count that will keep the data
> alive as long
> as the buffer is in the cache and it will be released when the cache
> segment is rotated.
>
> The advantage is we avoid any memory copy when inserting into or
> reading from the cache.
> The disadvantage is that we will have references to all the `ByteBuf`
> objects that are in the cache.
>
> ### API changes
>
> No user-facing API changes are required.
>
> ### New configuration options
>
> The existing cache implementation will not be remove

Re: [VOTE] PIP-174: Provide new implementation for broker dispatch cache

2022-07-21 Thread Dave Fisher
Sorry I’m late to this discussion.

I think that the motivation is correct. There is really quite a bit of activity 
around this issue. Let’s take extra efforts to engage extra time with commits 
to confirm performance improvements.

Let’s particularly pay attention to threading.

+1

Regards,
Dave

Sent from my iPhone

> On Jul 21, 2022, at 11:37 AM, Matteo Merli  wrote:
> 
> ## Motivation
> 
> The current implementation of the read cache in the Pulsar broker has largely
> remained unchanged for a long time, except for a few minor tweaks.
> 
> While the implementation is stable and reasonably efficient for
> typical workloads,
> the overhead required for managing the cache evictions in a broker
> that is running
> many topics can be pretty high in terms of extra CPU utilization and on the 
> JVM
> garbage collection to track an increased number of medium-lived objects.
> 
> The goal is to provide an alternative implementation that can adapt better to
> a wider variety of operating conditions.
> 
> ### Current implementation details
> 
> The broker cache is implemented as part of the `ManagedLedger` component,
> which sits in the Pulsar broker and provides a higher level of
> abstraction of top
> of BookKeeper.
> 
> Each topic (and managed-ledger) has its own private cache space. This
> cache is implemented
> as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) ->
> payload`. The payload
> is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got
> when reading from a socket, or it can be a copied buffer.
> 
> Each topic cache is allowed to use the full broker max cache size before an
> eviction is triggered. The total cache size is effectively a resource
> shared across all
> the topics, where a topic can use a more prominent portion of it if it
> "asks for more".
> 
> When the eviction happens, we need to do an expensive ranking of all
> the caches in the broker
> and do an eviction in a proportional way to the currently used space
> for each of them.
> 
> The bigger problem is represented by the `ConcurrentSkipList` and the
> `ByteBuf` objects
> that need to be tracked. The skip list is essentially like a "tree"
> structure and needs to
> maintain Java objects for each entry in the cache. We also need to
> potentially have
> a huge number of ByteBuf objects.
> 
> A cache workload is typically the worst-case scenario for each garbage
> collector implementation because it involves creating objects, storing
> them for some amount of
> time and then throwing them away. During that time, the GC would have
> already tenured these
> objects and copy them into an "old generation" space, and sometime
> later, a costly compaction
> of that memory would have to be performed.
> 
> To mitigate the effect of the cache workload on the GC, we're being
> very aggressive in
> purging the cache by triggering time-based eviction. By putting a max
> TTL on the elements in
> the cache, we can avoid keeping the objects around for too long to be
> a problem for the GC.
> 
> The reverse side of this is that we're artificially reducing the cache
> capacity to a very
> short time frame, reducing the cache usefulness.
> 
> The other problem is the CPU cost involved in doing these frequent
> evictions, which can
> be very high when there are 10s of thousands of topics in a broker.
> 
> 
> ## Proposed changes
> 
> Instead of dealing with individual caches for each topic, let's adopt
> a model where
> there is a single cache space for the broker.
> 
> This cache is broken into N segments which act as a circular buffer.
> Whenever a segment
> is full, we start writing into the next one, and when we reach the
> last one, we will
> restart recycling the first segment.
> 
> This model has been working very well for the BookKeeper `ReadCache`:
> https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
> 
> The eviction becomes a completely trivial operation, buffers are just
> rotated and
> overwritten. We don't need to do any per-topic task or keep track of
> utilization.
> 
> Today, there are 2 ways of configuring the cache, one that "copies"
> data into the cache
> and another that will just use reference-counting on the original
> buffers to avoid
> payload copies.
> 
> ### Memory copies into the cache
> 
> Each segment is composed of a buffer, an offset, and a hashmap which maps
> `(ledgerId, entryId) -> offset`.
> 
> 
> The advantage of this approach is that entries are copied into the cache 
> buffer
> (in direct memory), and we don't need to keep any long-lived Java objects 
> around
> 
> ### Keeping reference-counted buffers in the cache
> 
> Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`.
> Buffers will have an increase reference count that will keep the data
> alive as long
> as the buffer is in the cache and it will be released when the cache
> segment is rotated.
> 
> The advantag

Re: [DISCUSS] Apache Pulsar 2.8.4 release

2022-07-21 Thread Yunze Xu
Sure, I will take it carefully for those PRs not cherry-picked to branch-2.8 
but labeled
as `release/2.8.4`.

Thanks,
Yunze




> 2022年7月22日 00:09,Dave Fisher  写道:
> 
> Thank you for volunteering!
> 
>> On Jul 21, 2022, at 12:57 AM, Yunze Xu  wrote:
>> 
>> Hello Pulsar Community,
>> 
>> It has been several months since the 2.8.3 release and there are many
>> important fixes after that. For example, there is a performance
>> regression for message listener that was introduced from 2.8.3 and
>> fixed in https://github.com/apache/pulsar/pull/15162.
>> 
>> I volunteer to be the release manager for 2.8.4.
>> 
>> Here [0] you can find the list of 149 commits to branch-2.8 since the
>> 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
>> yet been cherry-picked [1] and I will cherry-pick them and solve the
>> conflicts if necessary. There are 18 open PRs labeled with
>> `release/2.8.4` [2]. I will check the status of them and see if they
>> can be pushed to 2.8.4.
> 
> Please use discretion about selecting bug fixes only and avoid adding new 
> features or unexpected configuration changes. Thanks!
> 
> Ask for help with the cherry picks since being in a Release Manager is a big 
> job!
> 
> Regards,
> Dave
> 
> 
>> 
>> Thanks,
>> Yunze
>> 
>> [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
>> [1] - 
>> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
>> [2] - 
>> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
>> 
> 



Re: [DISCUSS] PIP-191: Support batched message using entry filter

2022-07-21 Thread Anon Hxy
Hi Enrico

Thanks for your replying.

> 1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.

I just want to copy the partial message properties(we call it batch
properties here) to the main header of the entry, without modifying the
payload or single message metadata.  All messages in one batch must have
the same batch properties (both key and value).

>  2) we have to clarify the new message format of the entry and update
the protocol documents

It seems that the protocol documents has not described the batch message
main header. It just describes the `SingleMessageMetadata`. I agree with
that It's better to update the protocol documents to describe the batch
message main header.

> 3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage

It seems that there is no compatibility problems. Because I just add
properties to the main header , which is empty before,  and there is no
modification for the payload and single message metadata. The existing
EntryFilters can be able to work well with the new
format, the same as the existing  Protocol Handlers. :)

Thanks
Xiaoyu Hou

Enrico Olivelli  于2022年7月21日周四 20:27写道:

> Thank you for this proposal !
>
> I understand the problem, and I have already thought about it, because
> I am the author of some filters (especially the JMS Selectors Filter)
> but we have to clarify more about this PIP.
>
> 1) It is not clear to me if you want to push all the messages metadata
> in the main header of the entry or only the metatadata of the first
> entry, or only the KEY.
> 2) we have to clarify the new message format of the entry and update
> the protocol documents
> 3) existing EntryFilters won't be able to work well with the new
> format, we must find a way to make them fail and not process garbage
> 4) the same problem applies to Interceptors and Protocol Handlers
> (like KOP), we must make it clear in this PIP what is the upgrade path
> and give some suggestions to the developers of such components
>
> I am supporting this initiative, as far as we clarify those points.
>
> The Pulsar ecosystem is becoming more and more mature, and the user
> base is growing quickly, we can't break things without a clear path
> for the users and for the developers of extensions
>
> Enrico
>
>
> Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
>  ha scritto:
> >
> > Good. +1
> >
> > Anon Hxy  于2022年7月19日周二 18:00写道:
> >
> > > Hi Pulsar community:
> > >
> > > I open a pip to discuss "Support batched message using entry filter"
> > >
> > > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > > ---
> > >
> > > ## Motivation
> > >
> > > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > > https://github.com/apache/pulsar/issues/12269.  But this way only
> checks
> > > at
> > > the batch header,  without digging into the individual messages
> properties.
> > > Of course it provides an interface to deserialize the entire Entry to
> the
> > > heap,  But this will bring some memory and cpu workload. And in most
> > > scenarios we only need partial properties to do some filter.
> > >
> > > This proposal brings a method to make PIP-105 support batched entry
> without
> > > having to deserialize the entire Entry to the heap
> > >
> > >
> > > ## API Changes
> > >
> > > - Add a producer config to specialize the key, of which properties
> will be
> > > added to the batched entry metadata, for example:
> > >
> > > ```
> > >
> > >
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> > >
> > > ```
> > > The  `batchedFilterProperties` type is `List` with default
> value is
> > > empty list.  For an empty list, it means that the properties of entry's
> > > metadata are empty, and the `EntryFilter` will not take effect.
> > >
> > > ## Implementation
> > >
> > > - When call
> `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> > >  we extract the message properties and add it to `metadata`:
> > > ```
> > >  public boolean add(MessageImpl msg, SendCallback callback) {
> > >
> > > if (log.isDebugEnabled()) {
> > > log.debug("[{}] [{}] add message to batch, num messages in
> > > batch so far {}", topicName, producerName,
> > > numMessagesInBatch);
> > > }
> > >
> > > if (++numMessagesInBatch == 1) {
> > > try {
> > > // some properties are common amongst the different
> > > messages in the batch, hence we just pick it up from
> > > // the first message
> > > messageMetadata.setSequenceId(msg.getSequenceId());
> > > List filterProperties = getProperties(msg);
> > > if (!filterProperties.isEmpty()) {
> > >
>  messageMetadata.addAllProperties(filterProperties);  //
> > > and message properties here
>