[DISCUSS] Implementation for HTTP endpoint producer/consumer
Hi, Pulsar Community, We have the PIP-64 that introduces HTTP Rest API for producing/consuming messages( https://github.com/apache/pulsar/wiki/PIP-64%3A-Introduce-REST-endpoints-for-producing%2C-consuming-and-reading-messages ). But this proposal does not define the implementation. However, we already have producer HTTP API at the broker side. But, there are some problems, so refactored in this patch: https://github.com/apache/pulsar/pull/15876. Then we add HTTP consumer in this patch: https://github.com/apache/pulsar/pull/15942. But, currently have some ideas that do not reach a consensus. Like @Lari Hotaril mentioned at pull request https://github.com/apache/pulsar/pull/15942. It might not be a good idea to add the implementation to the main Pulsar Admin API at all. HTTP consuming would be better to handle in a separate component. PIP-64 doesn't determine that this should be part of Pulsar Admin API and we should revisit this decision. I think it's a bad idea to add HTTP consuming to Pulsar Admin API and brokers. I want to discuss whether we should implement the HTTP endpoint in the broker or separate it at another component(like pulsar-WebSocket). Best, Mattison
Re: [DISCUSS] Apache Pulsar 2.10.1 release
Thanks for the update. I will continue the 2.10.1 release. Penghui On Jun 4, 2022, 04:06 +0800, Dave Fisher , wrote: > Please proceed with preparing the 2.10.1 release. > > > On May 24, 2022, at 1:17 PM, Dave Fisher wrote: > > > > There are some PRs that are coming in that must be included. > > > > Thanks, > > Dave > > > > > On May 23, 2022, at 9:59 PM, Yunze Xu > > > wrote: > > > > > > +1 > > > > > > Thanks, > > > Yunze > > > > > > > > > > > > > > > > 2022年5月23日 11:34,Hang Chen 写道: > > > > > > > > +1 > > > > > > > > There are a lot of transaction fixes. > > > > > > > > Thanks, > > > > Hang > > > > > > > > PengHui Li 于2022年5月21日周六 13:06写道: > > > > > > > > > > Hello, Pulsar community: > > > > > > > > > > I'd like to propose to release Apache Pulsar 2.10.1 > > > > > > > > > > Currently, we have 190 commits [0] and there are many transaction > > > > > fixes, security fixes. > > > > > > > > > > And there are 22 open PRs [1], I will follow them to make sure that > > > > > the important fixes could be contained in 2.10.1 > > > > > > > > > > If you have any important fixes or any questions, > > > > > please reply to this email, we will evaluate whether to > > > > > include it in 2.10.1 > > > > > > > > > > [0] > > > > > https://github.com/apache/pulsar/pulls?q=is%3Amerged+is%3Apr+label%3Arelease%2F2.10.1+ > > > > > [1] > > > > > https://github.com/apache/pulsar/pulls?q=is%3Aopen+is%3Apr+label%3Arelease%2F2.10.1+ > > > > > > > > > > Best Regards > > > > > Penghui > > > > > >
[DISCUSS] PIP-174: Provide new implementation for broker dispatch cache
https://github.com/apache/pulsar/issues/15954 WIP can be seen at: https://github.com/apache/pulsar/pull/15955 --- ## Motivation The current implementation of the read cache in the Pulsar broker has largely remained unchanged for a long time, except for a few minor tweaks. While the implementation is stable and reasonably efficient for typical workloads, the overhead required for managing the cache evictions in a broker that is running many topics can be pretty high in terms of extra CPU utilization and on the JVM garbage collection to track an increased number of medium-lived objects. The goal is to provide an alternative implementation that can adapt better to a wider variety of operating conditions. ### Current implementation details The broker cache is implemented as part of the `ManagedLedger` component, which sits in the Pulsar broker and provides a higher level of abstraction of top of BookKeeper. Each topic (and managed-ledger) has its own private cache space. This cache is implemented as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) -> payload`. The payload is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we got when reading from a socket, or it can be a copied buffer. Each topic cache is allowed to use the full broker max cache size before an eviction is triggered. The total cache size is effectively a resource shared across all the topics, where a topic can use a more prominent portion of it if it "asks for more". When the eviction happens, we need to do an expensive ranking of all the caches in the broker and do an eviction in a proportional way to the currently used space for each of them. The bigger problem is represented by the `ConcurrentSkipList` and the `ByteBuf` objects that need to be tracked. The skip list is essentially like a "tree" structure and needs to maintain Java objects for each entry in the cache. We also need to potentially have a huge number of ByteBuf objects. A cache workload is typically the worst-case scenario for each garbage collector implementation because it involves creating objects, storing them for some amount of time and then throwing them away. During that time, the GC would have already tenured these objects and copy them into an "old generation" space, and sometime later, a costly compaction of that memory would have to be performed. To mitigate the effect of the cache workload on the GC, we're being very aggressive in purging the cache by triggering time-based eviction. By putting a max TTL on the elements in the cache, we can avoid keeping the objects around for too long to be a problem for the GC. The reverse side of this is that we're artificially reducing the cache capacity to a very short time frame, reducing the cache usefulness. The other problem is the CPU cost involved in doing these frequent evictions, which can be very high when there are 10s of thousands of topics in a broker. ## Proposed changes Instead of dealing with individual caches for each topic, let's adopt a model where there is a single cache space for the broker. This cache is broken into N segments which act as a circular buffer. Whenever a segment is full, we start writing into the next one, and when we reach the last one, we will restart recycling the first segment. Each segment is composed of a buffer, an offset, and a hashmap which maps `(ledgerId, entryId) -> offset`. This model has been working very well for the BookKeeper `ReadCache`: https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java There are two main advantages to this approach: 1. Entries are copied into the cache buffer (in direct memory), and we don't need to keep any long-lived Java objects around 2. The eviction becomes a completely trivial operation, buffers are just rotated and overwritten. We don't need to do any per-topic task or keep track of utilization. ### API changes No user-facing API changes are required. ### New configuration options The existing cache implementation will not be removed at this point. Users will be able to configure the old implementation in `broker.conf`. This option will be helpful in case of performance regressions would be seen for some use cases with the new cache implementation.
Re: [DISCUSS] [PIP-165] Auto release client useless connections
Ok, thanks for your explanation, make sense to me. +1 On 2022/06/05 17:08:22 Yubiao Feng wrote: > Hi Ran > > I think you mean that: Producer/Consumer failed to establish a connection > when he tried to work again. > > There are two places in the Broker configuration that limit the maximum > number of connections: > - Broker config : maxConnectionsLimitEnabled > - Broker config: maxConnectionsLimitPerIpEnabled > > At client side: > We only release connections that are not registered with producer or > Consumer or Transaction. So when a new producer creates it will get an > error (NotAllowedError because reached the maximum number of > connections) same as original design. > > At proxy side: > I'm sorry I didn't think it through before. I changed the proxy part of the > proposal: > > The connection between proxy and broker has two parts: For lookup commands; > For consumers, producers commands and other commands. > The connection "For consumers, producers commands and other commands" is > managed by DirectProxyHandler, which holds the connection until the client > is closed, so it does not affect of producers or consumers, These > connections do not require additional closing. > The connection "For lookup commands": When the proxy is configured > `metadataStoreUrl`, the Lookup Command will select the registered broker by > rotation training and create a connection. If we do not optimize the broker > load balancing algorithm, all connections are considered useful connections. > When the cluster is large, holds so many connections becomes redundant. > Later, I will try to put forward other proposals to improve this > phenomenon, so this proposal does not involve proxy connection release. > > > > > On Fri, Jun 3, 2022 at 11:44 AM Ran Gao wrote: > > > This is a good idea, but I have a concern, Pulsar has the config > > `brokerMaxConnections` to control max connection count against one broker. > > If the connection is closed, it will re-connect when consumers or producers > > start to consume and produce messages again, but this time the max > > connection count will reach the max count. > > > > > > On 2022/05/26 06:31:37 Yubiao Feng wrote: > > > I open a pip to discuss Auto release client useless connections, could > > you > > > help me review > > > > > > > > > ## Motivation > > > Currently, the Pulsar client keeps the connection even if no producers or > > > consumers use this connection. > > > If a client produces messages to topic A and we have 3 brokers 1, 2, 3. > > Due > > > to the bundle unloading(load manager) > > > topic ownership will change from A to B and finally to C. For now, the > > > client-side will keep 3 connections to all 3 brokers. > > > We can optimize this part to reduce the broker side connections, the > > client > > > should close the unused connections. > > > > > > So a mechanism needs to be added to release unwanted connections. > > > > > > ### Why are there idle connections? > > > > > > 1.When configuration `maxConnectionsPerHosts ` is not set to 0, the > > > connection is not closed at all. > > > The design is to hold a fixed number of connections per Host, avoiding > > > frequent closing and creation. > > > > > > > > https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L325-L335 > > > > > > 2-1. When clients receive `command-close`, will reconnect immediately. > > > It's designed to make it possible to reconnect, rebalance, and unload. > > > > > > > > https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L122-L141 > > > > > > 2-2. The broker will close client connections before writing ownership > > info > > > to the ZK. Then clients will get deprecated broker address when it tries > > > lookup. > > > > > > > > https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1282-L1293 > > > > > > ## Goal > > > Automatically release connections that are no longer used. > > > > > > - Scope > > > - **Pulsar client** > > > Contains connections used by consumers, Producers, and Transactions. > > > > > > - **Pulsar proxy** > > > Contains only the connection between Proxy and broker > > > > > > ## Approach > > > Periodically check for idle connections and close them. > > > > > > ## Changes > > > > > > ### API changes > > > **ClientCnx** added an idle check method to mark idle time. > > > > > > ```java > > > /** Create time. **/ > > > private final long createTime; > > > /** The time when marks the connection is idle. **/ > > > private long IdleMarkTime; > > > /** The time when the last valid data was transmitted. **/ > > > private long lastWorkTime; > > > /** Stat. enumerated values: using, idle_marked, before_release, > > released**/ > > > private int stat;
?????? [DISCUSS] [PIP-165] Auto release client useless connections
Good work. +1 -- -- ??: "dev" https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L325-L335 > > > > > > 2-1. When clients receive `command-close`, will reconnect immediately. > > > It's designed to make it possible to reconnect, rebalance, and unload. > > > > > > > > https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L122-L141 > > > > > > 2-2. The broker will close client connections before writing ownership > > info > > > to the ZK. Then clients will get deprecated broker address when it tries > > > lookup. > > > > > > > > https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1282-L1293 > > > > > > ## Goal > > > Automatically release connections that are no longer used. > > > > > > - Scope > > > - **Pulsar client** > > > Contains connections used by consumers, Producers, and Transactions. > > > > > > - **Pulsar proxy** > > > Contains only the connection between Proxy and broker > > > > > > ## Approach > > > Periodically check for idle connections and close them. > > > > > > ## Changes > > > > > > ### API changes > > > **ClientCnx** added an idle check method to mark idle time. > > > > > > ```java > > > /** Create time. **/ > > > private final long createTime; > > > /** The time when marks the connection is idle. **/ > > > private long IdleMarkTime; > > > /** The time when the last valid data was transmitted. **/ > > > private long lastWorkTime; > > > /** Stat. enumerated values: using, idle_marked, before_release, > > released**/ > > > private int stat; > > > /** > > > * Check client connection is now free. This method may change the state > > > to idle. > > > * This method will not change the state to idle. > > > */ > > > public boolen doIdleCheck(); > > > /** Get stat **/ > > > public int getStat(); > > > /** Change stat **/ > > > public int setStat(int originalStat, int newStat); > > > ``` > > > > > > ### Configuration changes > > > We can set the check frequency and release rule for idle connections at > > > `ClientConfigurationData`. > > > > > > ```java > > > @ApiModelProperty( > > > name = "autoReleaseIdleConnectionsEnabled", > > > value = "Do you want to automatically clean up unused > > connections" > > > ) > > > private boolean autoReleaseIdleConnectionsEnabled = true; > > > > > > @ApiModelProperty( > > > name = "connectionMaxIdleSeconds", > > > value = "Release the connection if it is not used for more than > > > [connectionMaxIdleSeconds] seconds" > > > ) > > > private int connectionMaxIdleSeconds = 180; > > > > > > @ApiModelProperty( > > > name = "connectionIdleDetectionIntervalSeconds", > > > value = "How often check idle connections" > > > ) > > > private int connectionIdleDetectionIntervalSeconds = 60; > > > ``` > > > > > > ## Implementation > > > > > > - **Pulsar client** > > > If no consumer, producer, or transaction uses the current connection, > > > release it. > > > > > > - **Pulsar proxy** > > > If the connection has not transmitted valid data for a long time, release > > > it. > > > > > > > > > Yubiao Feng > > > Thanks > > > > > >
[ANNOUNCE] New Committer: Dezhi Liu
The Project Management Committee (PMC) for Apache Pulsar has invited Dezhi Liu (https://github.com/liudezhi2098) to become a committer and we are pleased to announce that he has accepted. Dezhi Liu (with Github id liudezhi2098) contributed many improvements and bug fixes to Pulsar. Being a committer enables easier contribution to the project since there is no need to go via the patch submission process. This should enable better productivity. Welcome and Congratulations, Dezhi Liu! Please join us in congratulating and welcoming Dezhi Liu onboard! Best Regards, Hang Chen on behalf of the Pulsar PMC