Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value
Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li ha scritto: > > > What if the topic owner creates an internal subscription, consumes the > messages, and updates a count per filter. > > I agree with this approach. If we need to scan all the backlogs to > calculate the > accurate backlogs for each operation, it's so expensive and difficult to > apply to > the production environment. With the counter for each filter(subscription) > and only > re-scan the data after the filter changes will reduce a lot of overhead. This approach is very expensive when you don't need this feature. Because in order to have this value you have to read everything (once per subscription), especially when you have a subscription without consumers and the topic is being written. As you can't know when you will need this value, you are going to waste resources in order to calculate something that you won't use. This API to analyze a subscription is not meant to be used often, but only to debug (manually) problems. Also, the filter may be dependent on some other environment variables, like the wall clock time, if you have a filter that depends on time and you pre-calculate the backlog your counter won't be correct. > > If we want to expose the accurate backlogs in the Prometheus endpoint, > it's almost impossible. I don't think this is actually possible if you want to take into consideration the filters. We are in the case of general purpose filtering (actually we allow Java code to be plugged into the browser), so pre-calculating the counters won't work well. Enrico > > Thanks, > Penghui > > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika wrote: > > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli > > wrote: > > > > > Asaf, > > > > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika > > > ha scritto: > > > > > > > > I'm not sure I understand the context exactly: > > > > > > > > You say today we can only know the number of entries, hence we'll have > > a > > > > wrong number of backlog for subscription since: > > > > 1. One entry contains multiple messages (batch message) > > > > 2. Subscription may contain a filter, which requires you to read the > > > entire > > > > backlog to know it > > > > > > correct > > > > > > > > > > > There are two things I don't understand: > > > > > > > > 1. We're adding an observability API, which you need to pay all the > > read > > > > cost just to know the count. I presume people would want to run this > > more > > > > than once. So they will read same data multiple times - why would a > > user > > > be > > > > willing to pay such a hefty price? > > > > > > sometimes it is the case, because processing a message may have a high > > > cost. > > > So having 10 entries of 100 messages is not correctly representing the > > > amount of work that must be done by the consumers > > > and so the user may wish to have an exact count. > > > > > > Having the filters adds more complexity because you cannot predict how > > > many entries will be filtered out > > > > > > > > > So it's mainly serving that specific use case of reading the entire > > messages over and over (every interval) is an order of magnitude less > > expensive than the processing it self. > > > > > > > > 2. If the user needs to know an accurate backlog, can't they use the > > > > ability to create a very large number of topics, thus they will know an > > > > accurate backlog without the huge cost? > > > > > > I can't understand why creating many topics will help. > > > instead with filters it is very likely that you have only fewer topics > > > with many subscriptions with different filters > > > > > > as you don't know the filters while writing you cannot route the > > > messages to some topic > > > also you would need to write the message to potentially multiple > > > topics, and that would be a huge write amplification > > > (think about a topic with 100 subscriptions) > > > > > > Yes, I haven't thought about that. > > What I was thinking is that those filters are mutually exclusive therefor > > topics, but in your case, if you have 100 different filters, and they > > overlap, yes it would be way more expensive to write them 100 times. > > > > > > > > > I have an idea, if that's ok: > > > > > > > > What if you can keep, as you said in your document, a metric counting > > > > messages per filter upon write. > > > This is not possible as described above > > > > > > > You wrote above that: > > > > --- > > you cannot know which subscriptions will be created in a topic > > subscription can be created from the past (Earliest) > > subscription filters may change over time: they are usually configured > > using Subscription Properties, and those properties are dynamic > > doing computations on the write path (like running filters) kills > > latency and thoughtput > > > > Use a client to clone the subscription and consume data. > > This doesn't work because you have to transfer the data to the client, > > and this is possibly a huge a
[DISCUSS] Apache Pulsar 2.8.4 release
Hello Pulsar Community, It has been several months since the 2.8.3 release and there are many important fixes after that. For example, there is a performance regression for message listener that was introduced from 2.8.3 and fixed in https://github.com/apache/pulsar/pull/15162. I volunteer to be the release manager for 2.8.4. Here [0] you can find the list of 149 commits to branch-2.8 since the 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not yet been cherry-picked [1] and I will cherry-pick them and solve the conflicts if necessary. There are 18 open PRs labeled with `release/2.8.4` [2]. I will check the status of them and see if they can be pushed to 2.8.4. Thanks, Yunze [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8 [1] - https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed [2] - https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
Re: [DISCUSS] PIP-191: Support batched message using entry filter
Good. +1 Anon Hxy 于2022年7月19日周二 18:00写道: > Hi Pulsar community: > > I open a pip to discuss "Support batched message using entry filter" > > Proposal Link: https://github.com/apache/pulsar/issues/16680 > --- > > ## Motivation > > We already have a plug-in way to filter entries in broker, aka PIP-105 > https://github.com/apache/pulsar/issues/12269. But this way only checks > at > the batch header, without digging into the individual messages properties. > Of course it provides an interface to deserialize the entire Entry to the > heap, But this will bring some memory and cpu workload. And in most > scenarios we only need partial properties to do some filter. > > This proposal brings a method to make PIP-105 support batched entry without > having to deserialize the entire Entry to the heap > > > ## API Changes > > - Add a producer config to specialize the key, of which properties will be > added to the batched entry metadata, for example: > > ``` > > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties > > ``` > The `batchedFilterProperties` type is `List` with default value is > empty list. For an empty list, it means that the properties of entry's > metadata are empty, and the `EntryFilter` will not take effect. > > ## Implementation > > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, > we extract the message properties and add it to `metadata`: > ``` > public boolean add(MessageImpl msg, SendCallback callback) { > > if (log.isDebugEnabled()) { > log.debug("[{}] [{}] add message to batch, num messages in > batch so far {}", topicName, producerName, > numMessagesInBatch); > } > > if (++numMessagesInBatch == 1) { > try { > // some properties are common amongst the different > messages in the batch, hence we just pick it up from > // the first message > messageMetadata.setSequenceId(msg.getSequenceId()); > List filterProperties = getProperties(msg); > if (!filterProperties.isEmpty()) { > messageMetadata.addAllProperties(filterProperties); // > and message properties here > } > ``` > > - Also we need to add a method `hasSameProperties` like `hasSameSchema`. > Messages with same properties can be added to the same batch: > > ``` > private boolean canAddToCurrentBatch(MessageImpl msg) { > return batchMessageContainer.haveEnoughSpace(msg) >&& (!isMultiSchemaEnabled(false) > || batchMessageContainer.hasSameSchema(msg) > || batchMessageContainer.hasSameProperties(msg)) > && batchMessageContainer.hasSameTxn(msg); > } > > ``` > > > ## Reject Alternatives > > - Implement a `AbstractBatchMessageContainer` , saying > `BatchMessagePropertiesBasedContainer`, keeping messages with same > properties in a single `hashmap` entry, like > `BatchMessageKeyBasedContainer`. > > Rejection reason: This will publish messages out of order > > > > Thanks, > Xiaoyu Hou > -- BR, Qiang Huang
Re: [DISCUSS] PIP-191: Support batched message using entry filter
Thank you for this proposal ! I understand the problem, and I have already thought about it, because I am the author of some filters (especially the JMS Selectors Filter) but we have to clarify more about this PIP. 1) It is not clear to me if you want to push all the messages metadata in the main header of the entry or only the metatadata of the first entry, or only the KEY. 2) we have to clarify the new message format of the entry and update the protocol documents 3) existing EntryFilters won't be able to work well with the new format, we must find a way to make them fail and not process garbage 4) the same problem applies to Interceptors and Protocol Handlers (like KOP), we must make it clear in this PIP what is the upgrade path and give some suggestions to the developers of such components I am supporting this initiative, as far as we clarify those points. The Pulsar ecosystem is becoming more and more mature, and the user base is growing quickly, we can't break things without a clear path for the users and for the developers of extensions Enrico Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang ha scritto: > > Good. +1 > > Anon Hxy 于2022年7月19日周二 18:00写道: > > > Hi Pulsar community: > > > > I open a pip to discuss "Support batched message using entry filter" > > > > Proposal Link: https://github.com/apache/pulsar/issues/16680 > > --- > > > > ## Motivation > > > > We already have a plug-in way to filter entries in broker, aka PIP-105 > > https://github.com/apache/pulsar/issues/12269. But this way only checks > > at > > the batch header, without digging into the individual messages properties. > > Of course it provides an interface to deserialize the entire Entry to the > > heap, But this will bring some memory and cpu workload. And in most > > scenarios we only need partial properties to do some filter. > > > > This proposal brings a method to make PIP-105 support batched entry without > > having to deserialize the entire Entry to the heap > > > > > > ## API Changes > > > > - Add a producer config to specialize the key, of which properties will be > > added to the batched entry metadata, for example: > > > > ``` > > > > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties > > > > ``` > > The `batchedFilterProperties` type is `List` with default value is > > empty list. For an empty list, it means that the properties of entry's > > metadata are empty, and the `EntryFilter` will not take effect. > > > > ## Implementation > > > > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, > > we extract the message properties and add it to `metadata`: > > ``` > > public boolean add(MessageImpl msg, SendCallback callback) { > > > > if (log.isDebugEnabled()) { > > log.debug("[{}] [{}] add message to batch, num messages in > > batch so far {}", topicName, producerName, > > numMessagesInBatch); > > } > > > > if (++numMessagesInBatch == 1) { > > try { > > // some properties are common amongst the different > > messages in the batch, hence we just pick it up from > > // the first message > > messageMetadata.setSequenceId(msg.getSequenceId()); > > List filterProperties = getProperties(msg); > > if (!filterProperties.isEmpty()) { > > messageMetadata.addAllProperties(filterProperties); // > > and message properties here > > } > > ``` > > > > - Also we need to add a method `hasSameProperties` like `hasSameSchema`. > > Messages with same properties can be added to the same batch: > > > > ``` > > private boolean canAddToCurrentBatch(MessageImpl msg) { > > return batchMessageContainer.haveEnoughSpace(msg) > >&& (!isMultiSchemaEnabled(false) > > || batchMessageContainer.hasSameSchema(msg) > > || batchMessageContainer.hasSameProperties(msg)) > > && batchMessageContainer.hasSameTxn(msg); > > } > > > > ``` > > > > > > ## Reject Alternatives > > > > - Implement a `AbstractBatchMessageContainer` , saying > > `BatchMessagePropertiesBasedContainer`, keeping messages with same > > properties in a single `hashmap` entry, like > > `BatchMessageKeyBasedContainer`. > > > > Rejection reason: This will publish messages out of order > > > > > > > > Thanks, > > Xiaoyu Hou > > > > > -- > BR, > Qiang Huang
Re: [DISCUSS] Apache Pulsar 2.8.4 release
Thank you for volunteering! > On Jul 21, 2022, at 12:57 AM, Yunze Xu wrote: > > Hello Pulsar Community, > > It has been several months since the 2.8.3 release and there are many > important fixes after that. For example, there is a performance > regression for message listener that was introduced from 2.8.3 and > fixed in https://github.com/apache/pulsar/pull/15162. > > I volunteer to be the release manager for 2.8.4. > > Here [0] you can find the list of 149 commits to branch-2.8 since the > 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not > yet been cherry-picked [1] and I will cherry-pick them and solve the > conflicts if necessary. There are 18 open PRs labeled with > `release/2.8.4` [2]. I will check the status of them and see if they > can be pushed to 2.8.4. Please use discretion about selecting bug fixes only and avoid adding new features or unexpected configuration changes. Thanks! Ask for help with the cherry picks since being in a Release Manager is a big job! Regards, Dave > > Thanks, > Yunze > > [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8 > [1] - > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed > [2] - > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen >
Re: [DISCUSS] Apache Pulsar 2.11.0 Release
I understand the need for the Pulsar Summit. In that case I have to step back because I will be offline for the next few weeks Sorry Nicolò Boschi Il Gio 21 Lug 2022, 06:32 PengHui Li ha scritto: > Thanks for volunteering Nicolò. > > > So a plan could be to try to merge the work in progress targeted for 2.11 > by the mid of August and then start the code freezing as described in the > PIP. > > So the target release date will be early September. One point is Pulsar > Summit > San Francisco will start on August 18, 2022. I think maybe we can start to > test > the master branch for now and continue the in-progress tasks. If we can > have a > major release before Pulsar Summit, it should be good news to the > Community. > > Thanks. > Penghui > > On Mon, Jul 18, 2022 at 4:06 PM Enrico Olivelli > wrote: > > > Nicolò, > > > > Il Lun 18 Lug 2022, 10:00 Nicolò Boschi ha > scritto: > > > > > Thanks Penghui for the reminder. > > > I'd like to also include PIP: 181 Pulsar shell if the time permits. > > > > > > I believe that is a good idea to start testing the code freeze proposed > > by > > > PIP-175 (https://github.com/apache/pulsar/issues/15966). Even if not > > > officially approved, we discussed it many times and agreed to the > > > usefulness of the code freezing. > > > > > > > Great idea! > > > > We should really try it > > > > So a plan could be to try to merge the work in progress targeted for 2.11 > > > by the mid of August and then start the code freezing as described in > the > > > PIP. > > > > > > Also, I volunteer for driving the release if nobody else is interested > > > > > > > > > Thanks for volunteering > > > > Enrico > > > > > > > Thanks, > > > Nicolò Boschi > > > > > > > > > Il giorno lun 18 lug 2022 alle ore 06:59 Yunze Xu > > > ha scritto: > > > > > > > In addition to #16202, there is a following PR to support the correct > > > > ACK implementation for chunked messages. It should depend on #16202 > > > > But I think I can submit an initial PR this week and change the tests > > > > after #16202 is merged. > > > > > > > > Thanks, > > > > Yunze > > > > > > > > > > > > > > > > > > > > > 2022年7月18日 11:22,PengHui Li 写道: > > > > > > > > > > Hi all, > > > > > > > > > > We released 2.10.0 three months ago. And there are many great > changes > > > in > > > > > the master branch, > > > > > including new features and performance improvements. > > > > > > > > > > - PIP 74: apply client memory to consumer > > > > > https://github.com/apache/pulsar/pull/15216 > > > > > - PIP 143: Support split bundles by specified boundaries > > > > > https://github.com/apache/pulsar/pull/13796 > > > > > - PIP 145: regex subscription improvements > > > > > https://github.com/apache/pulsar/pull/16062 > > > > > - PIP 160: transaction performance improvements (still in progress > > and > > > > > merged some PRs) > > > > > - PIP 161: new exclusive producer mode support > > > > > https://github.com/apache/pulsar/pull/15488 > > > > > - PIP 182: Provide new load balance placement strategy > implementation > > > for > > > > > ModularLoadManagerStrategy > > https://github.com/apache/pulsar/pull/16281 > > > > > Add Pulsar Auth support for the Pulsar SQL > > > > > https://github.com/apache/pulsar/pull/15571 > > > > > > > > > > And some features are blocked in the review stage, but they are > > > powerful > > > > > improvements for Pulsar > > > > > > > > > > PIP 37: Support chunking with Shared subscription > > > > > https://github.com/apache/pulsar/pull/16202 > > > > > PIP-166: Function add MANUAL delivery semantics > > > > > https://github.com/apache/pulsar/pull/16279 > > > > > > > > > > You can find the complete change list in 2.11.0 at > > > > > > > > > > > > > > > https://github.com/apache/pulsar/pulls?q=is%3Apr+milestone%3A2.11.0+-label%3Arelease%2F2.10.1+-label%3Arelease%2F2.10.2 > > > > > > > > > > And maybe I missed some important in-progress PRs, please let me > know > > > if > > > > it > > > > > should be a blocker of the 2.11.0 release. > > > > > > > > > > It's a good time to discuss the target time of the 2.11.0 release. > > > > > I think we can leave 2 weeks to complete the in-progress PRs and 2 > > > weeks > > > > to > > > > > accept bug fixes. > > > > > And target the 2.11.0 release in mid-August. > > > > > > > > > > Please let me know what you think. > > > > > > > > > > Thanks, > > > > > Penghui > > > > > > > > > > > > > >
Re: [DISCUSS] Apache Pulsar 2.8.4 release
Thank you Yunze I agree that it is time to cut a release of 2.8. We should also one last release out of 2.7 branch. I hope that someone will volunteer Enrico Il Gio 21 Lug 2022, 18:10 Dave Fisher ha scritto: > Thank you for volunteering! > > > On Jul 21, 2022, at 12:57 AM, Yunze Xu > wrote: > > > > Hello Pulsar Community, > > > > It has been several months since the 2.8.3 release and there are many > > important fixes after that. For example, there is a performance > > regression for message listener that was introduced from 2.8.3 and > > fixed in https://github.com/apache/pulsar/pull/15162. > > > > I volunteer to be the release manager for 2.8.4. > > > > Here [0] you can find the list of 149 commits to branch-2.8 since the > > 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not > > yet been cherry-picked [1] and I will cherry-pick them and solve the > > conflicts if necessary. There are 18 open PRs labeled with > > `release/2.8.4` [2]. I will check the status of them and see if they > > can be pushed to 2.8.4. > > Please use discretion about selecting bug fixes only and avoid adding new > features or unexpected configuration changes. Thanks! > > Ask for help with the cherry picks since being in a Release Manager is a > big job! > > Regards, > Dave > > > > > > Thanks, > > Yunze > > > > [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8 > > [1] - > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed > > [2] - > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen > > > >
[VOTE] PIP-174: Provide new implementation for broker dispatch cache
## Motivation The current implementation of the read cache in the Pulsar broker has largely remained unchanged for a long time, except for a few minor tweaks. While the implementation is stable and reasonably efficient for typical workloads, the overhead required for managing the cache evictions in a broker that is running many topics can be pretty high in terms of extra CPU utilization and on the JVM garbage collection to track an increased number of medium-lived objects. The goal is to provide an alternative implementation that can adapt better to a wider variety of operating conditions. ### Current implementation details The broker cache is implemented as part of the `ManagedLedger` component, which sits in the Pulsar broker and provides a higher level of abstraction of top of BookKeeper. Each topic (and managed-ledger) has its own private cache space. This cache is implemented as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) -> payload`. The payload is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got when reading from a socket, or it can be a copied buffer. Each topic cache is allowed to use the full broker max cache size before an eviction is triggered. The total cache size is effectively a resource shared across all the topics, where a topic can use a more prominent portion of it if it "asks for more". When the eviction happens, we need to do an expensive ranking of all the caches in the broker and do an eviction in a proportional way to the currently used space for each of them. The bigger problem is represented by the `ConcurrentSkipList` and the `ByteBuf` objects that need to be tracked. The skip list is essentially like a "tree" structure and needs to maintain Java objects for each entry in the cache. We also need to potentially have a huge number of ByteBuf objects. A cache workload is typically the worst-case scenario for each garbage collector implementation because it involves creating objects, storing them for some amount of time and then throwing them away. During that time, the GC would have already tenured these objects and copy them into an "old generation" space, and sometime later, a costly compaction of that memory would have to be performed. To mitigate the effect of the cache workload on the GC, we're being very aggressive in purging the cache by triggering time-based eviction. By putting a max TTL on the elements in the cache, we can avoid keeping the objects around for too long to be a problem for the GC. The reverse side of this is that we're artificially reducing the cache capacity to a very short time frame, reducing the cache usefulness. The other problem is the CPU cost involved in doing these frequent evictions, which can be very high when there are 10s of thousands of topics in a broker. ## Proposed changes Instead of dealing with individual caches for each topic, let's adopt a model where there is a single cache space for the broker. This cache is broken into N segments which act as a circular buffer. Whenever a segment is full, we start writing into the next one, and when we reach the last one, we will restart recycling the first segment. This model has been working very well for the BookKeeper `ReadCache`: https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java The eviction becomes a completely trivial operation, buffers are just rotated and overwritten. We don't need to do any per-topic task or keep track of utilization. Today, there are 2 ways of configuring the cache, one that "copies" data into the cache and another that will just use reference-counting on the original buffers to avoid payload copies. ### Memory copies into the cache Each segment is composed of a buffer, an offset, and a hashmap which maps `(ledgerId, entryId) -> offset`. The advantage of this approach is that entries are copied into the cache buffer (in direct memory), and we don't need to keep any long-lived Java objects around ### Keeping reference-counted buffers in the cache Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`. Buffers will have an increase reference count that will keep the data alive as long as the buffer is in the cache and it will be released when the cache segment is rotated. The advantage is we avoid any memory copy when inserting into or reading from the cache. The disadvantage is that we will have references to all the `ByteBuf` objects that are in the cache. ### API changes No user-facing API changes are required. ### New configuration options The existing cache implementation will not be removed at this point. Users will be able to configure the old implementation in `broker.conf`. This option will be helpful in case of performance regressions would be seen for some use cases with the new cache implementation. -- Matteo Merli
Pulsar Flaky test report 2022-07-19 for PR builds in CI
Dear community, Here's a report of the flaky tests in Pulsar CI during the observation period of 2022-07-12 - 2022-07-19 https://docs.google.com/spreadsheets/d/165FHpHjs5fHccSsmQM4beeg6brn-zfUjcrXf6xAu4yQ/edit?usp=sharing The report contains a subset of the test failures. The flaky tests are observed from builds of merged PRs. The GitHub Actions logs will be checked for builds where the SHA of the head of the PR matches the SHA which got merged. This ensures that all found exceptions are real flakes, since no changes were made to the PR to make the tests pass later so that the PR was merged successfully. Top 5 flaky issues to fix: org.apache.pulsar.tests.integration.functions.java.PulsarFunctionsJavaThreadTest.testJavaLoggingFunction 92 org.apache.pulsar.functions.worker.PulsarFunctionTlsTest.tearDown 36 org.apache.pulsar.broker.service.persistent.PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.testDispatchRateCompatibility2 27 org.apache.pulsar.broker.service.persistent.PersistentDispatcherFailoverConsumerStreamingDispatcherTest.testAddRemoveConsumer 23 org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.testAddRemoveConsumer 22 Markdown formatted summary reports for each test class can be accessed at https://github.com/nicoloboschi/pulsar-flakes/tree/master/2022-07-12-to-2022-07-19 More flaky test issues: https://github.com/apache/pulsar/issues?q=flaky+sort%3Aupdated-desc+is%3Aopen+is:issue We need more help in addressing the flaky tests. Please join the efforts so that we can get CI to a more stable state. To coordinate the work, 1) please search for an existing issues or search for all flaky issues with "flaky" or the test class name (without package) in the search: https://github.com/apache/pulsar/issues?q=is%3Aopen+flaky +sort%3Aupdated-desc 2) If there isn't an issue for a particular flaky test failure that you'd like to fix, please create an issue using the "Flaky test" template at https://github.com/apache/pulsar/issues/new/choose 3) Please comment on the issue that you are working on. We have a few active contributors working on the flaky tests, thanks for the contributions. I'm looking forward to more contributors joining the efforts. Please join the #testing channel on Slack if you'd like to ask questions and tips about reproducing flaky tests locally and how to fix them. Sharing stories about fixing flaky tests is also helpful for sharing the knowledge about how flaky tests can be fixed. That's also a valuable way to contribute. Some flaky tests might be actual real production code bugs. Fixing the flaky test might result in fixing a real production code bug. More contributions are welcome! Please keep up the good work! Thanks, Nicolò Boschi
Re: [DISCUSS] PIP-192 New Pulsar Broker Load Balancer
Hi Rajan, As discussed in the Pulsar Community Meeting today, I added the "Modification Summary" section that compares the current solutions and our proposals. Regards, Heesung On Wed, Jul 20, 2022 at 4:24 PM Matteo Merli wrote: > > On Wed, Jul 20, 2022 at 1:37 PM Rajan Dhabalia wrote: > > > Also, here we have to clearly document that it will not impact behavior of > > existing load-balancers now or in future. Pulsar is used by many companies > > and orgs so, deprecating and not maintaining existing components is not > > acceptable in any circumstances. > > This is exactly the reason why this is going to be implemented as a > new loadmanager instead of improving the existing > ModularLoadManagerImpl. > > It gives the flexibility to start fresh without the existing baggage > of choices and try a significantly different approach. > > The current ModularLoadManagerImpl will not go away. Once the new load > manager will be ready and considered stable enough, there might be a > new discussion on whether to change the default implementation. Even > then, users will still be able to opt for the old load manager.
Re: [VOTE] PIP-174: Provide new implementation for broker dispatch cache
+1 Penghui Li On Jul 22, 2022, 02:37 +0800, Matteo Merli , wrote: > ## Motivation > > The current implementation of the read cache in the Pulsar broker has largely > remained unchanged for a long time, except for a few minor tweaks. > > While the implementation is stable and reasonably efficient for > typical workloads, > the overhead required for managing the cache evictions in a broker > that is running > many topics can be pretty high in terms of extra CPU utilization and on the > JVM > garbage collection to track an increased number of medium-lived objects. > > The goal is to provide an alternative implementation that can adapt better to > a wider variety of operating conditions. > > ### Current implementation details > > The broker cache is implemented as part of the `ManagedLedger` component, > which sits in the Pulsar broker and provides a higher level of > abstraction of top > of BookKeeper. > > Each topic (and managed-ledger) has its own private cache space. This > cache is implemented > as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) -> > payload`. The payload > is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got > when reading from a socket, or it can be a copied buffer. > > Each topic cache is allowed to use the full broker max cache size before an > eviction is triggered. The total cache size is effectively a resource > shared across all > the topics, where a topic can use a more prominent portion of it if it > "asks for more". > > When the eviction happens, we need to do an expensive ranking of all > the caches in the broker > and do an eviction in a proportional way to the currently used space > for each of them. > > The bigger problem is represented by the `ConcurrentSkipList` and the > `ByteBuf` objects > that need to be tracked. The skip list is essentially like a "tree" > structure and needs to > maintain Java objects for each entry in the cache. We also need to > potentially have > a huge number of ByteBuf objects. > > A cache workload is typically the worst-case scenario for each garbage > collector implementation because it involves creating objects, storing > them for some amount of > time and then throwing them away. During that time, the GC would have > already tenured these > objects and copy them into an "old generation" space, and sometime > later, a costly compaction > of that memory would have to be performed. > > To mitigate the effect of the cache workload on the GC, we're being > very aggressive in > purging the cache by triggering time-based eviction. By putting a max > TTL on the elements in > the cache, we can avoid keeping the objects around for too long to be > a problem for the GC. > > The reverse side of this is that we're artificially reducing the cache > capacity to a very > short time frame, reducing the cache usefulness. > > The other problem is the CPU cost involved in doing these frequent > evictions, which can > be very high when there are 10s of thousands of topics in a broker. > > > ## Proposed changes > > Instead of dealing with individual caches for each topic, let's adopt > a model where > there is a single cache space for the broker. > > This cache is broken into N segments which act as a circular buffer. > Whenever a segment > is full, we start writing into the next one, and when we reach the > last one, we will > restart recycling the first segment. > > This model has been working very well for the BookKeeper `ReadCache`: > https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java > > The eviction becomes a completely trivial operation, buffers are just > rotated and > overwritten. We don't need to do any per-topic task or keep track of > utilization. > > Today, there are 2 ways of configuring the cache, one that "copies" > data into the cache > and another that will just use reference-counting on the original > buffers to avoid > payload copies. > > ### Memory copies into the cache > > Each segment is composed of a buffer, an offset, and a hashmap which maps > `(ledgerId, entryId) -> offset`. > > > The advantage of this approach is that entries are copied into the cache > buffer > (in direct memory), and we don't need to keep any long-lived Java objects > around > > ### Keeping reference-counted buffers in the cache > > Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`. > Buffers will have an increase reference count that will keep the data > alive as long > as the buffer is in the cache and it will be released when the cache > segment is rotated. > > The advantage is we avoid any memory copy when inserting into or > reading from the cache. > The disadvantage is that we will have references to all the `ByteBuf` > objects that are in the cache. > > ### API changes > > No user-facing API changes are required. > > ### New configuration options > > The existing cache implementation will not be remove
Re: [VOTE] PIP-174: Provide new implementation for broker dispatch cache
Sorry I’m late to this discussion. I think that the motivation is correct. There is really quite a bit of activity around this issue. Let’s take extra efforts to engage extra time with commits to confirm performance improvements. Let’s particularly pay attention to threading. +1 Regards, Dave Sent from my iPhone > On Jul 21, 2022, at 11:37 AM, Matteo Merli wrote: > > ## Motivation > > The current implementation of the read cache in the Pulsar broker has largely > remained unchanged for a long time, except for a few minor tweaks. > > While the implementation is stable and reasonably efficient for > typical workloads, > the overhead required for managing the cache evictions in a broker > that is running > many topics can be pretty high in terms of extra CPU utilization and on the > JVM > garbage collection to track an increased number of medium-lived objects. > > The goal is to provide an alternative implementation that can adapt better to > a wider variety of operating conditions. > > ### Current implementation details > > The broker cache is implemented as part of the `ManagedLedger` component, > which sits in the Pulsar broker and provides a higher level of > abstraction of top > of BookKeeper. > > Each topic (and managed-ledger) has its own private cache space. This > cache is implemented > as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) -> > payload`. The payload > is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got > when reading from a socket, or it can be a copied buffer. > > Each topic cache is allowed to use the full broker max cache size before an > eviction is triggered. The total cache size is effectively a resource > shared across all > the topics, where a topic can use a more prominent portion of it if it > "asks for more". > > When the eviction happens, we need to do an expensive ranking of all > the caches in the broker > and do an eviction in a proportional way to the currently used space > for each of them. > > The bigger problem is represented by the `ConcurrentSkipList` and the > `ByteBuf` objects > that need to be tracked. The skip list is essentially like a "tree" > structure and needs to > maintain Java objects for each entry in the cache. We also need to > potentially have > a huge number of ByteBuf objects. > > A cache workload is typically the worst-case scenario for each garbage > collector implementation because it involves creating objects, storing > them for some amount of > time and then throwing them away. During that time, the GC would have > already tenured these > objects and copy them into an "old generation" space, and sometime > later, a costly compaction > of that memory would have to be performed. > > To mitigate the effect of the cache workload on the GC, we're being > very aggressive in > purging the cache by triggering time-based eviction. By putting a max > TTL on the elements in > the cache, we can avoid keeping the objects around for too long to be > a problem for the GC. > > The reverse side of this is that we're artificially reducing the cache > capacity to a very > short time frame, reducing the cache usefulness. > > The other problem is the CPU cost involved in doing these frequent > evictions, which can > be very high when there are 10s of thousands of topics in a broker. > > > ## Proposed changes > > Instead of dealing with individual caches for each topic, let's adopt > a model where > there is a single cache space for the broker. > > This cache is broken into N segments which act as a circular buffer. > Whenever a segment > is full, we start writing into the next one, and when we reach the > last one, we will > restart recycling the first segment. > > This model has been working very well for the BookKeeper `ReadCache`: > https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java > > The eviction becomes a completely trivial operation, buffers are just > rotated and > overwritten. We don't need to do any per-topic task or keep track of > utilization. > > Today, there are 2 ways of configuring the cache, one that "copies" > data into the cache > and another that will just use reference-counting on the original > buffers to avoid > payload copies. > > ### Memory copies into the cache > > Each segment is composed of a buffer, an offset, and a hashmap which maps > `(ledgerId, entryId) -> offset`. > > > The advantage of this approach is that entries are copied into the cache > buffer > (in direct memory), and we don't need to keep any long-lived Java objects > around > > ### Keeping reference-counted buffers in the cache > > Each segment in the cache will contain a map `(ledgerId, entryId) -> ByteBuf`. > Buffers will have an increase reference count that will keep the data > alive as long > as the buffer is in the cache and it will be released when the cache > segment is rotated. > > The advantag
Re: [DISCUSS] Apache Pulsar 2.8.4 release
Sure, I will take it carefully for those PRs not cherry-picked to branch-2.8 but labeled as `release/2.8.4`. Thanks, Yunze > 2022年7月22日 00:09,Dave Fisher 写道: > > Thank you for volunteering! > >> On Jul 21, 2022, at 12:57 AM, Yunze Xu wrote: >> >> Hello Pulsar Community, >> >> It has been several months since the 2.8.3 release and there are many >> important fixes after that. For example, there is a performance >> regression for message listener that was introduced from 2.8.3 and >> fixed in https://github.com/apache/pulsar/pull/15162. >> >> I volunteer to be the release manager for 2.8.4. >> >> Here [0] you can find the list of 149 commits to branch-2.8 since the >> 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not >> yet been cherry-picked [1] and I will cherry-pick them and solve the >> conflicts if necessary. There are 18 open PRs labeled with >> `release/2.8.4` [2]. I will check the status of them and see if they >> can be pushed to 2.8.4. > > Please use discretion about selecting bug fixes only and avoid adding new > features or unexpected configuration changes. Thanks! > > Ask for help with the cherry picks since being in a Release Manager is a big > job! > > Regards, > Dave > > >> >> Thanks, >> Yunze >> >> [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8 >> [1] - >> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed >> [2] - >> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen >> >
Re: [DISCUSS] PIP-191: Support batched message using entry filter
Hi Enrico Thanks for your replying. > 1) It is not clear to me if you want to push all the messages metadata in the main header of the entry or only the metatadata of the first entry, or only the KEY. I just want to copy the partial message properties(we call it batch properties here) to the main header of the entry, without modifying the payload or single message metadata. All messages in one batch must have the same batch properties (both key and value). > 2) we have to clarify the new message format of the entry and update the protocol documents It seems that the protocol documents has not described the batch message main header. It just describes the `SingleMessageMetadata`. I agree with that It's better to update the protocol documents to describe the batch message main header. > 3) existing EntryFilters won't be able to work well with the new format, we must find a way to make them fail and not process garbage It seems that there is no compatibility problems. Because I just add properties to the main header , which is empty before, and there is no modification for the payload and single message metadata. The existing EntryFilters can be able to work well with the new format, the same as the existing Protocol Handlers. :) Thanks Xiaoyu Hou Enrico Olivelli 于2022年7月21日周四 20:27写道: > Thank you for this proposal ! > > I understand the problem, and I have already thought about it, because > I am the author of some filters (especially the JMS Selectors Filter) > but we have to clarify more about this PIP. > > 1) It is not clear to me if you want to push all the messages metadata > in the main header of the entry or only the metatadata of the first > entry, or only the KEY. > 2) we have to clarify the new message format of the entry and update > the protocol documents > 3) existing EntryFilters won't be able to work well with the new > format, we must find a way to make them fail and not process garbage > 4) the same problem applies to Interceptors and Protocol Handlers > (like KOP), we must make it clear in this PIP what is the upgrade path > and give some suggestions to the developers of such components > > I am supporting this initiative, as far as we clarify those points. > > The Pulsar ecosystem is becoming more and more mature, and the user > base is growing quickly, we can't break things without a clear path > for the users and for the developers of extensions > > Enrico > > > Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang > ha scritto: > > > > Good. +1 > > > > Anon Hxy 于2022年7月19日周二 18:00写道: > > > > > Hi Pulsar community: > > > > > > I open a pip to discuss "Support batched message using entry filter" > > > > > > Proposal Link: https://github.com/apache/pulsar/issues/16680 > > > --- > > > > > > ## Motivation > > > > > > We already have a plug-in way to filter entries in broker, aka PIP-105 > > > https://github.com/apache/pulsar/issues/12269. But this way only > checks > > > at > > > the batch header, without digging into the individual messages > properties. > > > Of course it provides an interface to deserialize the entire Entry to > the > > > heap, But this will bring some memory and cpu workload. And in most > > > scenarios we only need partial properties to do some filter. > > > > > > This proposal brings a method to make PIP-105 support batched entry > without > > > having to deserialize the entire Entry to the heap > > > > > > > > > ## API Changes > > > > > > - Add a producer config to specialize the key, of which properties > will be > > > added to the batched entry metadata, for example: > > > > > > ``` > > > > > > > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties > > > > > > ``` > > > The `batchedFilterProperties` type is `List` with default > value is > > > empty list. For an empty list, it means that the properties of entry's > > > metadata are empty, and the `EntryFilter` will not take effect. > > > > > > ## Implementation > > > > > > - When call > `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, > > > we extract the message properties and add it to `metadata`: > > > ``` > > > public boolean add(MessageImpl msg, SendCallback callback) { > > > > > > if (log.isDebugEnabled()) { > > > log.debug("[{}] [{}] add message to batch, num messages in > > > batch so far {}", topicName, producerName, > > > numMessagesInBatch); > > > } > > > > > > if (++numMessagesInBatch == 1) { > > > try { > > > // some properties are common amongst the different > > > messages in the batch, hence we just pick it up from > > > // the first message > > > messageMetadata.setSequenceId(msg.getSequenceId()); > > > List filterProperties = getProperties(msg); > > > if (!filterProperties.isEmpty()) { > > > > messageMetadata.addAllProperties(filterProperties); // > > > and message properties here >