[DISCUSSION] Support custom and pluggable consumer selector
Hi Pulsar Community, When we try to Introduce pulsar to our existing system, compatibility is our first consideration. Firstly, our production system uses jump consistent hash algorithm to select consumers, but pulsar uses range consistent hash algorithm natively. It's impossible for us to guarantee that the same node can consume data with the same routing key from pulsar and our existing system simultaneously. It's better pulsar supports custom consumer selector when using key shared subscription. Thanks Zhangao [1] https://github.com/apache/pulsar/issues/13473 [2] https://github.com/apache/pulsar/pull/13470
Re: [DISCUSSION] PIP-130: Apply redelivery backoff policy for ack timeout
Enrico > What do you mean with 'redelivery backorder has not been released yet'? I think you mean "redelivery backoff has not been released yet"? This proposal changed the public APIs, but the changed APIs are introduced by https://github.com/apache/pulsar/pull/12566, we don't have a release contains the APIs that introduced in #12566, so we are safe the API. The behavior will not be changed in this proposal, we just have an additional API for users to achieve the flexible message redelivery control, will not change the behavior we have. > Pulsar client and Consumer are configurable using a map of key value pairs. So we must take care of not changing the behaviour. The proposal will not add more fields to the consumer configuration data, Just change the class name here https://github.com/apache/pulsar/blob/f0da648ae1c02248c015d26b93e08b2a9a78c1d3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L78 Regards, Penghui On Tue, Dec 28, 2021 at 3:19 PM Enrico Olivelli wrote: > Penghui > > I am overall +1 to this proposal but I am afraid about compatibility. Amd I > won't to be sure that we are not breaking anything. > Pulsar client and Consumer are configurable using a map of key value pairs. > So we must take care of not changing the behaviour. > > What do you mean with 'redelivery backorder has not been released yet'? > > > Enrico > > Il Lun 27 Dic 2021, 14:25 PengHui Li ha scritto: > > > https://github.com/apache/pulsar/issues/13528 > > > > Pasted below for quoting convenience. > > > > - > > > > PIP 130: Apply redelivery backoff policy for ack timeout > > > > ## Motivation > > > > PIP 106 > > > > > https://github.com/apache/pulsar/wiki/PIP-106%3A-Negative-acknowledgment-backoff > > introduced negative acknowledgment message redelivery backoff which > allows > > users to achieve > > more flexible message redelivery delay time control. But the redelivery > > backoff policy only > > apply to the negative acknowledgment API, for users who use ack timeout > to > > trigger the message > > redelivery, not the negative acknowledgment API, they can't use the new > > features introduced by > > PIP 106. > > > > So the proposal is to apply the message redelivery policy for the ack > > timeout mechanism. > > Users can specify an ack timeout redelivery backoff, for example, apply > an > > exponential backoff > > with 10 seconds ack timeout: > > > > ```java > > client.newConsumer() > > .ackTimeout(10, TimeUnit.SECOND) > > .ackTimeoutRedeliveryBackoff( > > ExponentialRedeliveryBackoff.builder() > > .minDelayMs(1000) > > .maxDelayMs(6).build()); > > .subscribe(); > > ``` > > > > The message redelivery behavior should be: > > > > | Redelivery count | Redelivery delay | > > | | | > > | 1 | 10 + 1 seconds | > > | 2 | 10 + 2 seconds | > > | 3 | 10 + 4 seconds | > > | 4 | 10 + 8 seconds | > > | 5 | 10 + 16 seconds | > > | 6 | 10 + 32 seconds | > > | 7 | 10 + 60 seconds | > > | 8 | 10 + 60 seconds | > > > > ## Goal > > > > Add an API to the Java Client to provide the ability to specify the ack > > timeout message redelivery > > backoff and the message redelivery behavior should abide by the > redelivery > > backoff policy. > > > > > > ## API Changes > > > > 1. Change `NegativeAckRedeliveryBackoff` to `RedeliveryBackoff`, so that > we > > can use the > > MessageRedeliveryBackoff for both negative acknowledgment API and ack > > timeout message redelivery. > > > > 2. Change `NegativeAckRedeliveryExponentialBackoff` to > > `ExponentialRedeliveryBackoff`, and add `multiplier` > > for `RedeliveryExponentialBackoff` with default value 2. > > > > ExponentialRedeliveryBackoff.builder() > > .minDelayMs(1000) > > .maxDelayMs(6) > > .multiplier(5) > > .build() > > > > 3. Add `ackTimeoutRedeliveryBackoff` method for the `ConsumerBuilder`: > > > > ```java > > client.newConsumer() > > .ackTimeout(10, TimeUnit.SECOND) > > .ackTimeoutRedeliveryBackoff( > > ExponentialRedeliveryBackoff.builder() > > .minDelayMs(1000) > > .maxDelayMs(6).build()); > > .subscribe(); > > ``` > > > > ## Compatibility and migration plan > > > > Since the negative acknowledgment message, redelivery backoff has not > been > > released yet, > > so we can modify the API directly. > > > > ## Tests plan > > > > - Verify the introduced `multiplier` of ExponentialRedeliveryBackoff > > - Verify the ack timeout message redelivery work as expected > > > > Regards, > > Penghui > > >
Re: [DISCUSSION] PIP-130: Apply redelivery backoff policy for ack timeout
+1 I missed that we haven't cherry picked that change to 2.9. Thanks for your confirmation Enrico Il Mar 28 Dic 2021, 12:44 PengHui Li ha scritto: > Enrico > > > What do you mean with 'redelivery backorder has not been released yet'? > > I think you mean "redelivery backoff has not been released yet"? > > This proposal changed the public APIs, but the changed APIs are introduced > by > https://github.com/apache/pulsar/pull/12566, we don't have a release > contains the > APIs that introduced in #12566, so we are safe the API. > > The behavior will not be changed in this proposal, we just have an > additional API for users > to achieve the flexible message redelivery control, will not change the > behavior we have. > > > Pulsar client and Consumer are configurable using a map of key value > pairs. > So we must take care of not changing the behaviour. > > The proposal will not add more fields to the consumer configuration data, > Just change the class name here > > https://github.com/apache/pulsar/blob/f0da648ae1c02248c015d26b93e08b2a9a78c1d3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L78 > > Regards, > Penghui > > > On Tue, Dec 28, 2021 at 3:19 PM Enrico Olivelli > wrote: > > > Penghui > > > > I am overall +1 to this proposal but I am afraid about compatibility. > Amd I > > won't to be sure that we are not breaking anything. > > Pulsar client and Consumer are configurable using a map of key value > pairs. > > So we must take care of not changing the behaviour. > > > > What do you mean with 'redelivery backorder has not been released yet'? > > > > > > Enrico > > > > Il Lun 27 Dic 2021, 14:25 PengHui Li ha scritto: > > > > > https://github.com/apache/pulsar/issues/13528 > > > > > > Pasted below for quoting convenience. > > > > > > - > > > > > > PIP 130: Apply redelivery backoff policy for ack timeout > > > > > > ## Motivation > > > > > > PIP 106 > > > > > > > > > https://github.com/apache/pulsar/wiki/PIP-106%3A-Negative-acknowledgment-backoff > > > introduced negative acknowledgment message redelivery backoff which > > allows > > > users to achieve > > > more flexible message redelivery delay time control. But the redelivery > > > backoff policy only > > > apply to the negative acknowledgment API, for users who use ack timeout > > to > > > trigger the message > > > redelivery, not the negative acknowledgment API, they can't use the new > > > features introduced by > > > PIP 106. > > > > > > So the proposal is to apply the message redelivery policy for the ack > > > timeout mechanism. > > > Users can specify an ack timeout redelivery backoff, for example, apply > > an > > > exponential backoff > > > with 10 seconds ack timeout: > > > > > > ```java > > > client.newConsumer() > > > .ackTimeout(10, TimeUnit.SECOND) > > > .ackTimeoutRedeliveryBackoff( > > > ExponentialRedeliveryBackoff.builder() > > > .minDelayMs(1000) > > > .maxDelayMs(6).build()); > > > .subscribe(); > > > ``` > > > > > > The message redelivery behavior should be: > > > > > > | Redelivery count | Redelivery delay | > > > | | | > > > | 1 | 10 + 1 seconds | > > > | 2 | 10 + 2 seconds | > > > | 3 | 10 + 4 seconds | > > > | 4 | 10 + 8 seconds | > > > | 5 | 10 + 16 seconds | > > > | 6 | 10 + 32 seconds | > > > | 7 | 10 + 60 seconds | > > > | 8 | 10 + 60 seconds | > > > > > > ## Goal > > > > > > Add an API to the Java Client to provide the ability to specify the ack > > > timeout message redelivery > > > backoff and the message redelivery behavior should abide by the > > redelivery > > > backoff policy. > > > > > > > > > ## API Changes > > > > > > 1. Change `NegativeAckRedeliveryBackoff` to `RedeliveryBackoff`, so > that > > we > > > can use the > > > MessageRedeliveryBackoff for both negative acknowledgment API and ack > > > timeout message redelivery. > > > > > > 2. Change `NegativeAckRedeliveryExponentialBackoff` to > > > `ExponentialRedeliveryBackoff`, and add `multiplier` > > > for `RedeliveryExponentialBackoff` with default value 2. > > > > > > ExponentialRedeliveryBackoff.builder() > > > .minDelayMs(1000) > > > .maxDelayMs(6) > > > .multiplier(5) > > > .build() > > > > > > 3. Add `ackTimeoutRedeliveryBackoff` method for the `ConsumerBuilder`: > > > > > > ```java > > > client.newConsumer() > > > .ackTimeout(10, TimeUnit.SECOND) > > > .ackTimeoutRedeliveryBackoff( > > > ExponentialRedeliveryBackoff.builder() > > > .minDelayMs(1000) > > > .maxDelayMs(6).build()); > > > .subscribe(); > > > ``` > > > > > > ## Compatibility and migration plan > > > > > > Since the negative acknowledgment message, redelivery backoff has not > > been > > > released yet, > > > so we can modify the API directly. > > > > > > ## Tests plan > > > > > > - Verify the introduced `multiplier` of ExponentialRedeliveryBackoff > > > - Verify the ack timeout
[GitHub] [pulsar-adapters] michaeljmarshall commented on issue #29: Pulsar - Spark adapter for scala 2.11
michaeljmarshall commented on issue #29: URL: https://github.com/apache/pulsar-adapters/issues/29#issuecomment-1002236797 @aditiwari01 - try running `mvn initialize license:format` from the base directory of the project. Let me know if that doesn't work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [pulsar-adapters] eolivelli commented on a change in pull request #31: [Issue #29] [pulsar-spark] Adding SparkPulsarReliableReceiver
eolivelli commented on a change in pull request #31: URL: https://github.com/apache/pulsar-adapters/pull/31#discussion_r776054579 ## File path: pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala ## @@ -0,0 +1,158 @@ +package org.apache.pulsar.spark + +import com.google.common.util.concurrent.RateLimiter +import org.apache.pulsar.client.api._ +import org.apache.pulsar.client.impl.PulsarClientImpl +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import org.slf4j.LoggerFactory + +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.collection.mutable.ListBuffer + +/** + * Custom spark receiver to pull messages from Pubsub topic and push into reliable store. + * If backpressure is enabled,the message ingestion rate for this receiver will be managed by Spark. + * + * Following spark configurations can be used to control rates and block size + * spark.streaming.backpressure.enabled + * spark.streaming.backpressure.initialRate + * spark.streaming.receiver.maxRate + * spark.streaming.blockQueueSize: Controlling block size + * spark.streaming.backpressure.pid.minRate + * + * See Spark streaming configurations doc + * https://spark.apache.org/docs/latest/configuration.html#spark-streaming + * + * @param storageLevel Storage level to be used + * @param serviceUrlService URL to the Pubsub Cluster + * @param pulsarConfMap of PulsarConf containing keys from `org.apache.pulsar.client.impl.conf.ConsumerConfigurationData` + * @param authenticationAuthentication object for authenticating consumer to pulsar. Use `AuthenticationDisabled` is authentication is disabled. + * @param sparkConf Spark configs + * @param maxPollSize Max number of records to read by consumer in one batch + * @param consumerName Consumer name to be used by receiver + * @param autoAcknowledge Acknowledge pubsub message or not + * @param rateMultiplierFactor Rate multiplier factor in case you want to have more rate than what PIDRateEstimator suggests. > 1.0 is useful in case + * of spark dynamic allocation to utilise extra resources. Keep it 1.0 if spark dynamic allocation is disabled. + */ + +class SparkStreamingReliablePulsarReceiver(override val storageLevel: StorageLevel, + val serviceUrl: String, + val pulsarConf: Map[String, Any], + val authentication: Authentication, + val sparkConf: SparkConf, + val maxPollSize: Int, + val consumerName: String, + val autoAcknowledge: Boolean = true, + val rateMultiplierFactor: Double = 1.0) extends Receiver[SparkPulsarMessage](storageLevel) { + + private val maxRateLimit: Long = sparkConf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue) + private val blockSize: Int = sparkConf.getInt("spark.streaming.blockQueueSize", maxPollSize) + private val blockIntervalMs: Long = sparkConf.getTimeAsMs("spark.streaming.blockInterval", "200ms") + + lazy val rateLimiter: RateLimiter = RateLimiter.create(math.min( +sparkConf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), +maxRateLimit + ).toDouble) + + val buffer: ListBuffer[SparkPulsarMessage] = new ListBuffer[SparkPulsarMessage]() + + private var pulsarClient: PulsarClient = null + private var consumer: Consumer[Array[Byte]] = null + + var latestStorePushTime: Long = System.currentTimeMillis() + + override def onStart(): Unit = { +try { + pulsarClient = PulsarClient.builder.serviceUrl(serviceUrl).authentication(authentication).build + val topics = pulsarConf.get("topicNames").get.asInstanceOf[List[String]] + consumer = pulsarClient.asInstanceOf[PulsarClientImpl].newConsumer() +.loadConf(pulsarConf.filter(_._1 != "topicNames").map(pay => pay._1 -> pay._2.asInstanceOf[AnyRef])) + .topics(topics).receiverQueueSize(maxPollSize).consumerName(consumerName) +.subscribe() +} catch { + case e: Exception => +SparkStreamingReliablePulsarReceiver.LOG.error("Failed to start subscription : {}", e.getMessage) +e.printStackTrace() +restart("Restart a consumer") +} +latestStorePushTime = System.currentTimeMillis() +new Thread() { Review comment: Can we give a name to this thread? Also, can we keep a reference to the Thread as a field? Usually this helps in shutting down the system as you can wait for the Thread to exit in order to n
Re: [DISCUSS] PIP-124: Pulsar Client Shared State API
> * Add an API to the Java client that makes it easier to maintain a consistent > Share State between instances of an application. > * Provide some ready to use recipes, like a simple key-value store > > It is not a goal to implement a Pulsar backed Database system While the first use case for Pulsar was indeed to be the messaging/replication platform for a distributed database, and it has been working in production for many years, I'm not convinced to add this level of API as part of the Pulsar client API. Pulsar API has been designed to be high-level and easy to use (and reason about), with in mind the use cases of application developers. I don't think that a "storage" level API fits well with the rest of abstractions. > public interface PulsarMap extends AutoCloseable { > .. > CompletableFuture put(K key, V value) If all the logic is implemented in the client side, when there are multiple clients sharing the same, how can any of them mutate the state, since we actually enforce that there is a single exclusive producer? Would a user get an error if there's already a different client writing? My impression is that, while looking convenient, a shared Map interface is not the best abstraction for either case: * If you're actually building a DB, you will definitely need access to the log itself rather than a Map interface * If you want to share some state across multiple processes without using a DB, there are many tricky API, consistency and semantic problems to solve, many of which are just pushed down to the application which will need to be aware and understand them. At that point, I would seriously recommend using a DB, or if the question is: "I don't want to use an additional external system", then to use the BK TableService component. I think this feature should be best viewed as a recipe, as it doesn't depend on or benefits from any internal broker support. If there are enough interest and concrete use cases it can be then included later. -- Matteo Merli On Fri, Dec 24, 2021 at 1:53 AM Enrico Olivelli wrote: > > Hello everyone, > I want to start a discussion about PIP-124 Pulsar Client Shared State API > > This is the PIP document > https://github.com/apache/pulsar/issues/13490 > > This is a demo implementation (a proof-of-concept): > https://github.com/eolivelli/pulsar-shared-state-manager > > Please take a look and share your thoughts > > I believe that this will unlock the potential of the Exclusive > Producer and it will also make easier the life of many developers who > are using Pulsar and need some API to share configuration, metadata, > or any simple key-value data structure without adding a Database or > other components to their library, Pulsar IO connector or Pulsar > Protocol Handler. > > Thanks > Enrico
Re: [DISCUSSION] PIP-124: Create init subscription before sending message to DLQ
> Oh, that's a very interesting point. I think it'd be easy to add that > as "internal" feature, though I'm a bit puzzled on how to add that to > the producer API I think we can add a field `String initialSubscriptionName` to the Producer Configuration. And add a new field `optional string initial_subscription_name` to the `CommnadProducer`. When the Broker handles the CommandProducer, if it checks that the initialSubscriptionName is not empty or null, it will use initialSubscriptionName to create a subscription on that topic. When creating the deadLetterProducer or retryLetterProducer, we can specify and create the initial subscription directly through the Producer. What do you think? On Thu, Dec 23, 2021 at 7:42 AM Matteo Merli wrote: > > > What if we extended the `CommandProducer` command to add a > > `create_subscription` field? Then, any time a topic is auto > > created and this field is true, the broker would auto create a > > subscription. There are some details to work out, but I think this > > feature would fulfill the needs of this PIP and would also be broadly > > useful for many client applications that dynamically create topics. > > Oh, that's a very interesting point. I think it'd be easy to add that > as "internal" feature, though I'm a bit puzzled on how to add that to > the producer API -- Zike Yang
[VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set
This is the voting thread for PIP-131. It will stay open for at least 48h. https://github.com/apache/pulsar/issues/13544 The discussion thread is https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf ## Motivation Currently, chunk messages producing fails if topic level maxMessageSize is set [1]. The root cause of this issue is because chunk message is using broker level maxMessageSize as chunk size. And topic level maxMessageSize is always <= broker level maxMessageSize. So once it is set, the on-going chunk message producing fails. ## Goal Resolve topic level maxMessageSize compatibility issue with chunking messages. ## Implementation Current best solution would be just skipping topic level maxMessageSize check in org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize. Topic level maxMessageSize is introduced in [2], for the purpose of "easier to plan resource quotas for client allocation". And IMO this change will not bring further complex into this. ## Reject Alternatives Add a client side topic level maxMessageSize and keep it synced with broker. Required changes: - [client] Add a new field org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this client side topic level maxMessageSize. - [PulsarApi.proto] Add a new field maxMessageSize in the CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize - [PulsarApi.proto] Add a new Command like CommandUpdateClientPolicy{producerId, maxMessageSize} to update ProducerBase#maxMessageSize when topic level maxMessageSize is updated. Further more, some other data consistency issues need be handled very carefully when maxMessageSize is updated. This alternative is complex but can also solve other topic level maxMessageSize issue [3] when batching is enabled (non-batching case is solved with PR [4]). [1] https://github.com/apache/pulsar/issues/13360 [2] https://github.com/apache/pulsar/pull/8732 [3] https://github.com/apache/pulsar/issues/12958 [4] https://github.com/apache/pulsar/pull/13147 Thanks, Haiting Jiang
[DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER
## Motivation Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic, But client will be reconnect. If config `allowAutoTopicCreation=true` will trigger create topic again. https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133 ## Goal Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives the command, it only closes without reconnecting. ## API Changes 1. Add command to `BaseCommand` ```java message BaseCommand { enum Type { // ... STOP_PRODUCER = 64; STOP_CONSUMER = 65 } } ```
Re: [DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER
Issue: https://github.com/apache/pulsar/issues/13488 > 2021年12月29日 10:3812,包子 写道: > > ## Motivation > > Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic, But > client will be reconnect. If config `allowAutoTopicCreation=true` will > trigger create topic again. > > https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133 > > ## Goal > > Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives the > command, it only closes without reconnecting. > > ## API Changes > > 1. Add command to `BaseCommand` > ```java > message BaseCommand { >enum Type { >// ... >STOP_PRODUCER = 64; >STOP_CONSUMER = 65 >} > } > ``` >
Re: [VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set
+1 Thanks, Penghui On Wed, Dec 29, 2021 at 10:29 AM Haiting Jiang wrote: > This is the voting thread for PIP-131. It will stay open for at least 48h. > > https://github.com/apache/pulsar/issues/13544 > > The discussion thread is > https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf > > ## Motivation > > Currently, chunk messages producing fails if topic level maxMessageSize is > set [1]. The root cause of this issue is because chunk message is using > broker level maxMessageSize as chunk size. And topic level maxMessageSize > is always <= broker level maxMessageSize. So once it is set, the on-going > chunk message producing fails. > > ## Goal > > Resolve topic level maxMessageSize compatibility issue with chunking > messages. > > ## Implementation > > Current best solution would be just skipping topic level maxMessageSize > check in > org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize. > Topic level maxMessageSize is introduced in [2], for the purpose of > "easier to plan resource quotas for client allocation". And IMO this change > will not bring further complex into this. > > ## Reject Alternatives > > Add a client side topic level maxMessageSize and keep it synced with > broker. > > Required changes: > - [client] Add a new field > org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this > client side topic level maxMessageSize. > - [PulsarApi.proto] Add a new field maxMessageSize in the > CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize > - [PulsarApi.proto] Add a new Command like > CommandUpdateClientPolicy{producerId, maxMessageSize} to update > ProducerBase#maxMessageSize when topic level maxMessageSize is updated. > Further more, some other data consistency issues need be handled very > carefully when maxMessageSize is updated. > This alternative is complex but can also solve other topic level > maxMessageSize issue [3] when batching is enabled (non-batching case is > solved with PR [4]). > > [1] https://github.com/apache/pulsar/issues/13360 > [2] https://github.com/apache/pulsar/pull/8732 > [3] https://github.com/apache/pulsar/issues/12958 > [4] https://github.com/apache/pulsar/pull/13147 > > Thanks, > Haiting Jiang >
Re: [DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER
What is the background of the requirement? Usually, we will not force to close the producer and consumer at the server-side, because we don't if the client-side can handle this case well. Or, if the topic will retire, and you don't want the clients to connect to it, you can just terminate the topic. Thanks Penghui On Wed, Dec 29, 2021 at 10:39 AM 包子 wrote: > Issue: https://github.com/apache/pulsar/issues/13488 > > > 2021年12月29日 10:3812,包子 写道: > > > > ## Motivation > > > > Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic, > But client will be reconnect. If config `allowAutoTopicCreation=true` will > trigger create topic again. > > > > > https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133 > > > > ## Goal > > > > Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives > the command, it only closes without reconnecting. > > > > ## API Changes > > > > 1. Add command to `BaseCommand` > > ```java > > message BaseCommand { > >enum Type { > >// ... > >STOP_PRODUCER = 64; > >STOP_CONSUMER = 65 > >} > > } > > ``` > > > >
[GitHub] [pulsar-manager] sourabhaggrawal opened a new pull request #437: Set read/write access to resource for each role
sourabhaggrawal opened a new pull request #437: URL: https://github.com/apache/pulsar-manager/pull/437 ### Motivation Assign resource level read/write access to each role of a user. This way we can assign multiple resources to a user with read/write permission. * A user may exist with multiple resource access where he/she is allowed to only read a particular resource while for other resource it can make changes.* ### Modifications * 1. Added access field at role level which can have read or write as value. * 2. Added tenants field in resourceType list and will list available tenants in resource field while creating a role * 3. Added access map (Map>) in /usersInfo response, each key is the resourceType and value holds another map with resourcename as value and value as read/write. * 4. Added hasPermissionToResource method in module/users.js which is called for each actionable element when ui is rendered to decide to show/hide the resource based on access. * 5. Changes in .vue file to call the hasPermissionToResource method to decide if user has access to a particular element. ### Verifying this change - [ ] Make sure that the change passes the `./gradlew build` checks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set
+1 Thanks, Zike On Wed, Dec 29, 2021 at 11:26 AM PengHui Li wrote: > > +1 > > Thanks, > Penghui > > On Wed, Dec 29, 2021 at 10:29 AM Haiting Jiang > wrote: > > > This is the voting thread for PIP-131. It will stay open for at least 48h. > > > > https://github.com/apache/pulsar/issues/13544 > > > > The discussion thread is > > https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf > > > > ## Motivation > > > > Currently, chunk messages producing fails if topic level maxMessageSize is > > set [1]. The root cause of this issue is because chunk message is using > > broker level maxMessageSize as chunk size. And topic level maxMessageSize > > is always <= broker level maxMessageSize. So once it is set, the on-going > > chunk message producing fails. > > > > ## Goal > > > > Resolve topic level maxMessageSize compatibility issue with chunking > > messages. > > > > ## Implementation > > > > Current best solution would be just skipping topic level maxMessageSize > > check in > > org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize. > > Topic level maxMessageSize is introduced in [2], for the purpose of > > "easier to plan resource quotas for client allocation". And IMO this change > > will not bring further complex into this. > > > > ## Reject Alternatives > > > > Add a client side topic level maxMessageSize and keep it synced with > > broker. > > > > Required changes: > > - [client] Add a new field > > org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this > > client side topic level maxMessageSize. > > - [PulsarApi.proto] Add a new field maxMessageSize in the > > CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize > > - [PulsarApi.proto] Add a new Command like > > CommandUpdateClientPolicy{producerId, maxMessageSize} to update > > ProducerBase#maxMessageSize when topic level maxMessageSize is updated. > > Further more, some other data consistency issues need be handled very > > carefully when maxMessageSize is updated. > > This alternative is complex but can also solve other topic level > > maxMessageSize issue [3] when batching is enabled (non-batching case is > > solved with PR [4]). > > > > [1] https://github.com/apache/pulsar/issues/13360 > > [2] https://github.com/apache/pulsar/pull/8732 > > [3] https://github.com/apache/pulsar/issues/12958 > > [4] https://github.com/apache/pulsar/pull/13147 > > > > Thanks, > > Haiting Jiang > > -- Zike Yang
Re: [DISCUSS] PIP-124: Pulsar Client Shared State API
Matteo, Il Mer 29 Dic 2021, 02:57 Matteo Merli ha scritto: > > * Add an API to the Java client that makes it easier to maintain a > consistent Share State between instances of an application. > > * Provide some ready to use recipes, like a simple key-value store > > > > It is not a goal to implement a Pulsar backed Database system > > While the first use case for Pulsar was indeed to be the > messaging/replication platform for a distributed database, and it has > been working in production for many years, I'm not convinced to add > this level of API as part of the Pulsar client API. > > Pulsar API has been designed to be high-level and easy to use (and > reason about), with in mind the use cases of application developers. I > don't think that a "storage" level API fits well with the rest of > abstractions. > > > public interface PulsarMap extends AutoCloseable { > > .. > > CompletableFuture put(K key, V value) > > If all the logic is implemented in the client side, when there are > multiple clients sharing the same, how can any of them mutate the > state, since we actually enforce that there is a single exclusive > producer? Would a user get an error if there's already a different > client writing? > > My impression is that, while looking convenient, a shared Map > interface is not the best abstraction for either case: > * If you're actually building a DB, you will definitely need access > to the log itself rather than a Map interface > * If you want to share some state across multiple processes without > using a DB, there are many tricky API, consistency and semantic > problems to solve, many of which are just pushed down to the > application which will need to be aware and understand them. At that > point, I would seriously recommend using a DB, or if the question is: > "I don't want to use an additional external system", then to use the > BK TableService component. > This is usually not a option because the BK TableService does not support well multi tenancy and also the application would need to connect to the Bookies (think about configuration, security...) > > I think this feature should be best viewed as a recipe, as it doesn't > depend on or benefits from any internal broker support. If there are > enough interest and concrete use cases it can be then included later. > My initial proposal was to push this to Pulsar Adapters. I changed the proposal before sending the PIP because I think it very useful for Protocol Handlers and in Pulsar IO connectors. I am totally fine to add this to pulsar-adapters, but I want to see this in the Pulsar repo and released as part of an official Pulsar recipe. @Matteo does this sound like a good option to you? Otherwise we miss the possibility to make it easier for Pulsar users to leverage this power of Pulsar. In Pravega you have State Synchronizers and they are a great foundational API and we are missing something like that in Pulsar. Enrico > -- > Matteo Merli > > > On Fri, Dec 24, 2021 at 1:53 AM Enrico Olivelli > wrote: > > > > Hello everyone, > > I want to start a discussion about PIP-124 Pulsar Client Shared State > API > > > > This is the PIP document > > https://github.com/apache/pulsar/issues/13490 > > > > This is a demo implementation (a proof-of-concept): > > https://github.com/eolivelli/pulsar-shared-state-manager > > > > Please take a look and share your thoughts > > > > I believe that this will unlock the potential of the Exclusive > > Producer and it will also make easier the life of many developers who > > are using Pulsar and need some API to share configuration, metadata, > > or any simple key-value data structure without adding a Database or > > other components to their library, Pulsar IO connector or Pulsar > > Protocol Handler. > > > > Thanks > > Enrico >