[DISCUSSION] Support custom and pluggable consumer selector

2021-12-28 Thread zhangao
Hi Pulsar Community, 

When we try to Introduce pulsar to our existing system, compatibility is our 
first consideration.

Firstly, our production system uses jump consistent hash algorithm to select 
consumers, but pulsar uses range consistent hash algorithm natively. It's 
impossible for us to guarantee that the same node can consume data with the 
same routing key from pulsar and our existing system simultaneously.

It's better pulsar supports custom consumer selector when using key shared 
subscription. 



Thanks
Zhangao




[1] https://github.com/apache/pulsar/issues/13473 [2] 
https://github.com/apache/pulsar/pull/13470

Re: [DISCUSSION] PIP-130: Apply redelivery backoff policy for ack timeout

2021-12-28 Thread PengHui Li
Enrico

> What do you mean with 'redelivery backorder has not been released yet'?

I think you mean "redelivery backoff has not been released yet"?

This proposal changed the public APIs, but the changed APIs are introduced
by
https://github.com/apache/pulsar/pull/12566, we don't have a release
contains the
APIs that introduced in #12566, so we are safe the API.

The behavior will not be changed in this proposal, we just have an
additional API for users
to achieve the flexible message redelivery control, will not change the
behavior we have.

> Pulsar client and Consumer are configurable using a map of key value
pairs.
So we must take care of not changing the behaviour.

The proposal will not add more fields to the consumer configuration data,
Just change the class name here
https://github.com/apache/pulsar/blob/f0da648ae1c02248c015d26b93e08b2a9a78c1d3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L78

Regards,
Penghui


On Tue, Dec 28, 2021 at 3:19 PM Enrico Olivelli  wrote:

> Penghui
>
> I am overall +1 to this proposal but I am afraid about compatibility. Amd I
> won't to be sure that we are not breaking anything.
> Pulsar client and Consumer are configurable using a map of key value pairs.
> So we must take care of not changing the behaviour.
>
> What do you mean with 'redelivery backorder has not been released yet'?
>
>
> Enrico
>
> Il Lun 27 Dic 2021, 14:25 PengHui Li  ha scritto:
>
> > https://github.com/apache/pulsar/issues/13528
> >
> > Pasted below for quoting convenience.
> >
> > -
> >
> > PIP 130: Apply redelivery backoff policy for ack timeout
> >
> > ## Motivation
> >
> > PIP 106
> >
> >
> https://github.com/apache/pulsar/wiki/PIP-106%3A-Negative-acknowledgment-backoff
> > introduced negative acknowledgment message redelivery backoff which
> allows
> > users to achieve
> > more flexible message redelivery delay time control. But the redelivery
> > backoff policy only
> > apply to the negative acknowledgment API, for users who use ack timeout
> to
> > trigger the message
> > redelivery, not the negative acknowledgment API, they can't use the new
> > features introduced by
> > PIP 106.
> >
> > So the proposal is to apply the message redelivery policy for the ack
> > timeout mechanism.
> > Users can specify an ack timeout redelivery backoff, for example, apply
> an
> > exponential backoff
> > with 10 seconds ack timeout:
> >
> > ```java
> > client.newConsumer()
> > .ackTimeout(10, TimeUnit.SECOND)
> > .ackTimeoutRedeliveryBackoff(
> > ExponentialRedeliveryBackoff.builder()
> > .minDelayMs(1000)
> > .maxDelayMs(6).build());
> > .subscribe();
> > ```
> >
> > The message redelivery behavior should be:
> >
> > |  Redelivery count   | Redelivery delay  |
> > |    |   |
> > | 1 | 10 + 1 seconds |
> > | 2 | 10 + 2 seconds |
> > | 3 | 10 + 4 seconds |
> > | 4 | 10 + 8 seconds |
> > | 5 | 10 + 16 seconds |
> > | 6 | 10 + 32 seconds |
> > | 7 | 10 + 60 seconds |
> > | 8 | 10 + 60 seconds |
> >
> > ## Goal
> >
> > Add an API to the Java Client to provide the ability to specify the ack
> > timeout message redelivery
> > backoff and the message redelivery behavior should abide by the
> redelivery
> > backoff policy.
> >
> >
> > ## API Changes
> >
> > 1. Change `NegativeAckRedeliveryBackoff` to `RedeliveryBackoff`, so that
> we
> > can use the
> > MessageRedeliveryBackoff for both negative acknowledgment API and ack
> > timeout message redelivery.
> >
> > 2. Change `NegativeAckRedeliveryExponentialBackoff` to
> > `ExponentialRedeliveryBackoff`, and add `multiplier`
> > for `RedeliveryExponentialBackoff` with default value 2.
> >
> > ExponentialRedeliveryBackoff.builder()
> > .minDelayMs(1000)
> > .maxDelayMs(6)
> > .multiplier(5)
> > .build()
> >
> > 3. Add `ackTimeoutRedeliveryBackoff` method for the `ConsumerBuilder`:
> >
> > ```java
> > client.newConsumer()
> > .ackTimeout(10, TimeUnit.SECOND)
> > .ackTimeoutRedeliveryBackoff(
> > ExponentialRedeliveryBackoff.builder()
> > .minDelayMs(1000)
> > .maxDelayMs(6).build());
> > .subscribe();
> > ```
> >
> > ## Compatibility and migration plan
> >
> > Since the negative acknowledgment message, redelivery backoff has not
> been
> > released yet,
> > so we can modify the API directly.
> >
> > ## Tests plan
> >
> > - Verify the introduced `multiplier` of ExponentialRedeliveryBackoff
> > - Verify the ack timeout message redelivery work as expected
> >
> > Regards,
> > Penghui
> >
>


Re: [DISCUSSION] PIP-130: Apply redelivery backoff policy for ack timeout

2021-12-28 Thread Enrico Olivelli
+1

I missed that we haven't cherry picked that change to 2.9.

Thanks for your confirmation


Enrico

Il Mar 28 Dic 2021, 12:44 PengHui Li  ha scritto:

> Enrico
>
> > What do you mean with 'redelivery backorder has not been released yet'?
>
> I think you mean "redelivery backoff has not been released yet"?
>
> This proposal changed the public APIs, but the changed APIs are introduced
> by
> https://github.com/apache/pulsar/pull/12566, we don't have a release
> contains the
> APIs that introduced in #12566, so we are safe the API.
>
> The behavior will not be changed in this proposal, we just have an
> additional API for users
> to achieve the flexible message redelivery control, will not change the
> behavior we have.
>
> > Pulsar client and Consumer are configurable using a map of key value
> pairs.
> So we must take care of not changing the behaviour.
>
> The proposal will not add more fields to the consumer configuration data,
> Just change the class name here
>
> https://github.com/apache/pulsar/blob/f0da648ae1c02248c015d26b93e08b2a9a78c1d3/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L78
>
> Regards,
> Penghui
>
>
> On Tue, Dec 28, 2021 at 3:19 PM Enrico Olivelli 
> wrote:
>
> > Penghui
> >
> > I am overall +1 to this proposal but I am afraid about compatibility.
> Amd I
> > won't to be sure that we are not breaking anything.
> > Pulsar client and Consumer are configurable using a map of key value
> pairs.
> > So we must take care of not changing the behaviour.
> >
> > What do you mean with 'redelivery backorder has not been released yet'?
> >
> >
> > Enrico
> >
> > Il Lun 27 Dic 2021, 14:25 PengHui Li  ha scritto:
> >
> > > https://github.com/apache/pulsar/issues/13528
> > >
> > > Pasted below for quoting convenience.
> > >
> > > -
> > >
> > > PIP 130: Apply redelivery backoff policy for ack timeout
> > >
> > > ## Motivation
> > >
> > > PIP 106
> > >
> > >
> >
> https://github.com/apache/pulsar/wiki/PIP-106%3A-Negative-acknowledgment-backoff
> > > introduced negative acknowledgment message redelivery backoff which
> > allows
> > > users to achieve
> > > more flexible message redelivery delay time control. But the redelivery
> > > backoff policy only
> > > apply to the negative acknowledgment API, for users who use ack timeout
> > to
> > > trigger the message
> > > redelivery, not the negative acknowledgment API, they can't use the new
> > > features introduced by
> > > PIP 106.
> > >
> > > So the proposal is to apply the message redelivery policy for the ack
> > > timeout mechanism.
> > > Users can specify an ack timeout redelivery backoff, for example, apply
> > an
> > > exponential backoff
> > > with 10 seconds ack timeout:
> > >
> > > ```java
> > > client.newConsumer()
> > > .ackTimeout(10, TimeUnit.SECOND)
> > > .ackTimeoutRedeliveryBackoff(
> > > ExponentialRedeliveryBackoff.builder()
> > > .minDelayMs(1000)
> > > .maxDelayMs(6).build());
> > > .subscribe();
> > > ```
> > >
> > > The message redelivery behavior should be:
> > >
> > > |  Redelivery count   | Redelivery delay  |
> > > |    |   |
> > > | 1 | 10 + 1 seconds |
> > > | 2 | 10 + 2 seconds |
> > > | 3 | 10 + 4 seconds |
> > > | 4 | 10 + 8 seconds |
> > > | 5 | 10 + 16 seconds |
> > > | 6 | 10 + 32 seconds |
> > > | 7 | 10 + 60 seconds |
> > > | 8 | 10 + 60 seconds |
> > >
> > > ## Goal
> > >
> > > Add an API to the Java Client to provide the ability to specify the ack
> > > timeout message redelivery
> > > backoff and the message redelivery behavior should abide by the
> > redelivery
> > > backoff policy.
> > >
> > >
> > > ## API Changes
> > >
> > > 1. Change `NegativeAckRedeliveryBackoff` to `RedeliveryBackoff`, so
> that
> > we
> > > can use the
> > > MessageRedeliveryBackoff for both negative acknowledgment API and ack
> > > timeout message redelivery.
> > >
> > > 2. Change `NegativeAckRedeliveryExponentialBackoff` to
> > > `ExponentialRedeliveryBackoff`, and add `multiplier`
> > > for `RedeliveryExponentialBackoff` with default value 2.
> > >
> > > ExponentialRedeliveryBackoff.builder()
> > > .minDelayMs(1000)
> > > .maxDelayMs(6)
> > > .multiplier(5)
> > > .build()
> > >
> > > 3. Add `ackTimeoutRedeliveryBackoff` method for the `ConsumerBuilder`:
> > >
> > > ```java
> > > client.newConsumer()
> > > .ackTimeout(10, TimeUnit.SECOND)
> > > .ackTimeoutRedeliveryBackoff(
> > > ExponentialRedeliveryBackoff.builder()
> > > .minDelayMs(1000)
> > > .maxDelayMs(6).build());
> > > .subscribe();
> > > ```
> > >
> > > ## Compatibility and migration plan
> > >
> > > Since the negative acknowledgment message, redelivery backoff has not
> > been
> > > released yet,
> > > so we can modify the API directly.
> > >
> > > ## Tests plan
> > >
> > > - Verify the introduced `multiplier` of ExponentialRedeliveryBackoff
> > > - Verify the ack timeout

[GitHub] [pulsar-adapters] michaeljmarshall commented on issue #29: Pulsar - Spark adapter for scala 2.11

2021-12-28 Thread GitBox


michaeljmarshall commented on issue #29:
URL: https://github.com/apache/pulsar-adapters/issues/29#issuecomment-1002236797


   @aditiwari01 - try running `mvn initialize license:format` from the base 
directory of the project. Let me know if that doesn't work.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [pulsar-adapters] eolivelli commented on a change in pull request #31: [Issue #29] [pulsar-spark] Adding SparkPulsarReliableReceiver

2021-12-28 Thread GitBox


eolivelli commented on a change in pull request #31:
URL: https://github.com/apache/pulsar-adapters/pull/31#discussion_r776054579



##
File path: 
pulsar-spark/src/main/scala/org/apache/pulsar/spark/SparkStreamingReliablePulsarReceiver.scala
##
@@ -0,0 +1,158 @@
+package org.apache.pulsar.spark
+
+import com.google.common.util.concurrent.RateLimiter
+import org.apache.pulsar.client.api._
+import org.apache.pulsar.client.impl.PulsarClientImpl
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver.Receiver
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+/**
+  * Custom spark receiver to pull messages from Pubsub topic and push into 
reliable store.
+  * If backpressure is enabled,the message ingestion rate for this receiver 
will be managed by Spark.
+  *
+  * Following spark configurations can be used to control rates and block size
+  * spark.streaming.backpressure.enabled
+  * spark.streaming.backpressure.initialRate
+  * spark.streaming.receiver.maxRate
+  * spark.streaming.blockQueueSize: Controlling block size
+  * spark.streaming.backpressure.pid.minRate
+  *
+  * See Spark streaming configurations doc
+  * https://spark.apache.org/docs/latest/configuration.html#spark-streaming
+  *
+  * @param storageLevel  Storage level to be used
+  * @param serviceUrlService URL to the Pubsub Cluster
+  * @param pulsarConfMap of PulsarConf containing keys from 
`org.apache.pulsar.client.impl.conf.ConsumerConfigurationData`
+  * @param authenticationAuthentication object for authenticating 
consumer to pulsar. Use `AuthenticationDisabled` is authentication is disabled.
+  * @param sparkConf Spark configs
+  * @param maxPollSize   Max number of records to read by consumer 
in one batch
+  * @param consumerName  Consumer name to be used by receiver
+  * @param autoAcknowledge   Acknowledge pubsub message or not
+  * @param rateMultiplierFactor  Rate multiplier factor in case you want 
to have more rate than what PIDRateEstimator suggests. > 1.0  is useful in case
+  *  of spark dynamic allocation to utilise 
extra resources. Keep it 1.0 if spark dynamic allocation is disabled.
+  */
+
+class SparkStreamingReliablePulsarReceiver(override val storageLevel: 
StorageLevel,
+   val serviceUrl: String,
+   val pulsarConf: Map[String, Any],
+   val authentication: Authentication,
+   val sparkConf: SparkConf,
+   val maxPollSize: Int,
+   val consumerName: String,
+   val autoAcknowledge: Boolean = true,
+   val rateMultiplierFactor: Double  = 
1.0) extends Receiver[SparkPulsarMessage](storageLevel) {
+
+  private val maxRateLimit: Long = 
sparkConf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
+  private val blockSize: Int = 
sparkConf.getInt("spark.streaming.blockQueueSize", maxPollSize)
+  private val blockIntervalMs: Long = 
sparkConf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
+
+  lazy val rateLimiter: RateLimiter = RateLimiter.create(math.min(
+sparkConf.getLong("spark.streaming.backpressure.initialRate", 
maxRateLimit),
+maxRateLimit
+  ).toDouble)
+
+  val buffer: ListBuffer[SparkPulsarMessage] = new 
ListBuffer[SparkPulsarMessage]()
+
+  private var pulsarClient: PulsarClient = null
+  private var consumer: Consumer[Array[Byte]] = null
+
+  var latestStorePushTime: Long = System.currentTimeMillis()
+
+  override def onStart(): Unit = {
+try {
+  pulsarClient = 
PulsarClient.builder.serviceUrl(serviceUrl).authentication(authentication).build
+  val topics = pulsarConf.get("topicNames").get.asInstanceOf[List[String]]
+  consumer = pulsarClient.asInstanceOf[PulsarClientImpl].newConsumer()
+.loadConf(pulsarConf.filter(_._1 != "topicNames").map(pay => pay._1 -> 
pay._2.asInstanceOf[AnyRef]))
+
.topics(topics).receiverQueueSize(maxPollSize).consumerName(consumerName)
+.subscribe()
+} catch {
+  case e: Exception =>
+SparkStreamingReliablePulsarReceiver.LOG.error("Failed to start 
subscription : {}", e.getMessage)
+e.printStackTrace()
+restart("Restart a consumer")
+}
+latestStorePushTime = System.currentTimeMillis()
+new Thread() {

Review comment:
   Can we give a name to this thread?
   
   Also, can we keep a reference to the Thread as a field?
   Usually this helps in shutting down the system as you can wait for the 
Thread to exit in order to n

Re: [DISCUSS] PIP-124: Pulsar Client Shared State API

2021-12-28 Thread Matteo Merli
> * Add an API to the Java client that makes it easier to maintain a consistent 
> Share State between instances of an application.
> * Provide some ready to use recipes, like a simple key-value store
>
> It is not a goal to implement a Pulsar backed Database system

While the first use case for Pulsar was indeed to be the
messaging/replication platform for a distributed database, and it has
been working in production for many years, I'm not convinced to add
this level of API as part of the Pulsar client API.

Pulsar API has been designed to be high-level and easy to use (and
reason about), with in mind the use cases of application developers. I
don't think that a "storage" level API fits well with the rest of
abstractions.

> public interface PulsarMap extends AutoCloseable {
> ..
>  CompletableFuture put(K key, V value)

If all the logic is implemented in the client side, when there are
multiple clients sharing the same, how can any of them mutate the
state, since we actually enforce that there is a single exclusive
producer? Would a user get an error if there's already a different
client writing?

My impression is that, while looking convenient, a shared Map
interface is not the best abstraction for either case:
 * If you're actually building a DB, you will definitely need access
to the log itself rather than a Map interface
 * If you want to share some state across multiple processes without
using a DB, there are many tricky API, consistency and semantic
problems to solve, many of which are just pushed down to the
application which will need to be aware and understand them. At that
point, I would seriously recommend using a DB, or if the question is:
"I don't want to use an additional external system", then to use the
BK TableService component.


I think this feature should be best viewed as a recipe, as it doesn't
depend on or benefits from any internal broker support. If there are
enough interest and concrete use cases it can be then included later.

--
Matteo Merli


On Fri, Dec 24, 2021 at 1:53 AM Enrico Olivelli  wrote:
>
> Hello everyone,
> I want to start a discussion about PIP-124 Pulsar Client Shared  State API
>
> This is the PIP document
> https://github.com/apache/pulsar/issues/13490
>
> This is a demo implementation (a proof-of-concept):
> https://github.com/eolivelli/pulsar-shared-state-manager
>
> Please take a look and share your thoughts
>
> I believe that this will unlock the potential of the Exclusive
> Producer and it will also make easier the life of many developers who
> are using Pulsar and need some API to share configuration, metadata,
> or any simple key-value data structure without adding a Database or
> other components to their library, Pulsar IO connector or Pulsar
> Protocol Handler.
>
> Thanks
> Enrico


Re: [DISCUSSION] PIP-124: Create init subscription before sending message to DLQ

2021-12-28 Thread Zike Yang
> Oh, that's a very interesting point. I think it'd be easy to add that
> as "internal" feature, though I'm a bit puzzled on how to add that to
> the producer API

I think we can add a field `String initialSubscriptionName` to the
Producer Configuration. And add a new field `optional string
initial_subscription_name` to the `CommnadProducer`.
When the Broker handles the CommandProducer, if it checks that the
initialSubscriptionName is not empty or null, it will use
initialSubscriptionName to create a subscription on that topic. When
creating the deadLetterProducer or retryLetterProducer, we can specify
and create the initial subscription directly through the Producer.
What do you think?

On Thu, Dec 23, 2021 at 7:42 AM Matteo Merli  wrote:
>
> > What if we extended the `CommandProducer` command to add a
> > `create_subscription` field? Then, any time a topic is auto
> > created and this field is true, the broker would auto create a
> > subscription. There are some details to work out, but I think this
> > feature would fulfill the needs of this PIP and would also be broadly
> > useful for many client applications that dynamically create topics.
>
> Oh, that's a very interesting point. I think it'd be easy to add that
> as "internal" feature, though I'm a bit puzzled on how to add that to
> the producer API



-- 
Zike Yang


[VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set

2021-12-28 Thread Haiting Jiang
This is the voting thread for PIP-131. It will stay open for at least 48h.

https://github.com/apache/pulsar/issues/13544

The discussion thread is 
https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf

## Motivation

Currently, chunk messages producing fails if topic level maxMessageSize is set 
[1]. The root cause of this issue is because chunk message is using broker 
level maxMessageSize as chunk size. And topic level maxMessageSize is always <= 
broker level maxMessageSize. So once it is set, the on-going chunk message 
producing fails.

## Goal

Resolve topic level maxMessageSize compatibility issue with chunking messages.

## Implementation

Current best solution would be just skipping topic level maxMessageSize check 
in org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize. 
Topic level maxMessageSize is introduced in [2], for the purpose of "easier to 
plan resource quotas for client allocation". And IMO this change will not bring 
further complex into this.

## Reject Alternatives

Add a client side topic level maxMessageSize and keep it synced with broker.

Required changes:
- [client] Add a new field 
org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this client 
side topic level maxMessageSize.
- [PulsarApi.proto] Add a new field maxMessageSize in the 
CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize
- [PulsarApi.proto] Add a new Command like 
CommandUpdateClientPolicy{producerId, maxMessageSize} to update 
ProducerBase#maxMessageSize when topic level maxMessageSize is updated.
Further more, some other data consistency issues need be handled very carefully 
when maxMessageSize is updated.
This alternative is complex but can also solve other topic level maxMessageSize 
issue [3] when batching is enabled (non-batching case is solved with PR [4]).

[1] https://github.com/apache/pulsar/issues/13360
[2] https://github.com/apache/pulsar/pull/8732
[3] https://github.com/apache/pulsar/issues/12958
[4] https://github.com/apache/pulsar/pull/13147

Thanks,
Haiting Jiang


[DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER

2021-12-28 Thread 包子
## Motivation

Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic, But 
client will be reconnect. If config `allowAutoTopicCreation=true` will trigger 
create topic again.

https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133

## Goal

Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives the 
command, it only closes without reconnecting.

## API Changes

1. Add command to `BaseCommand`
```java
message BaseCommand {
enum Type {
// ...
STOP_PRODUCER = 64;
STOP_CONSUMER = 65
}
}
```



Re: [DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER

2021-12-28 Thread 包子
Issue: https://github.com/apache/pulsar/issues/13488

> 2021年12月29日 10:3812,包子  写道:
> 
> ## Motivation
> 
> Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic, But 
> client will be reconnect. If config `allowAutoTopicCreation=true` will 
> trigger create topic again.
> 
> https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133
> 
> ## Goal
> 
> Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives the 
> command, it only closes without reconnecting.
> 
> ## API Changes
> 
> 1. Add command to `BaseCommand`
> ```java
> message BaseCommand {
>enum Type {
>// ...
>STOP_PRODUCER = 64;
>STOP_CONSUMER = 65
>}
> }
> ```
> 



Re: [VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set

2021-12-28 Thread PengHui Li
+1

Thanks,
Penghui

On Wed, Dec 29, 2021 at 10:29 AM Haiting Jiang 
wrote:

> This is the voting thread for PIP-131. It will stay open for at least 48h.
>
> https://github.com/apache/pulsar/issues/13544
>
> The discussion thread is
> https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf
>
> ## Motivation
>
> Currently, chunk messages producing fails if topic level maxMessageSize is
> set [1]. The root cause of this issue is because chunk message is using
> broker level maxMessageSize as chunk size. And topic level maxMessageSize
> is always <= broker level maxMessageSize. So once it is set, the on-going
> chunk message producing fails.
>
> ## Goal
>
> Resolve topic level maxMessageSize compatibility issue with chunking
> messages.
>
> ## Implementation
>
> Current best solution would be just skipping topic level maxMessageSize
> check in
> org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize.
> Topic level maxMessageSize is introduced in [2], for the purpose of
> "easier to plan resource quotas for client allocation". And IMO this change
> will not bring further complex into this.
>
> ## Reject Alternatives
>
> Add a client side topic level maxMessageSize and keep it synced with
> broker.
>
> Required changes:
> - [client] Add a new field
> org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this
> client side topic level maxMessageSize.
> - [PulsarApi.proto] Add a new field maxMessageSize in the
> CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize
> - [PulsarApi.proto] Add a new Command like
> CommandUpdateClientPolicy{producerId, maxMessageSize} to update
> ProducerBase#maxMessageSize when topic level maxMessageSize is updated.
> Further more, some other data consistency issues need be handled very
> carefully when maxMessageSize is updated.
> This alternative is complex but can also solve other topic level
> maxMessageSize issue [3] when batching is enabled (non-batching case is
> solved with PR [4]).
>
> [1] https://github.com/apache/pulsar/issues/13360
> [2] https://github.com/apache/pulsar/pull/8732
> [3] https://github.com/apache/pulsar/issues/12958
> [4] https://github.com/apache/pulsar/pull/13147
>
> Thanks,
> Haiting Jiang
>


Re: [DISCUSSION] PIP-128: Add new command STOP_PRODUCER and STOP_CONSUMER

2021-12-28 Thread PengHui Li
What is the background of the requirement?
Usually, we will not force to close the producer and consumer at the
server-side,
because we don't if the client-side can handle this case well.

Or, if the topic will retire, and you don't want the clients to connect to
it, you can just terminate the topic.

Thanks
Penghui

On Wed, Dec 29, 2021 at 10:39 AM 包子 
wrote:

> Issue: https://github.com/apache/pulsar/issues/13488
>
> > 2021年12月29日 10:3812,包子  写道:
> >
> > ## Motivation
> >
> > Broker send `CLOSE_PRODUCER/CLOSE_CONSUMER` to client when delete topic,
> But client will be reconnect. If config `allowAutoTopicCreation=true` will
> trigger create topic again.
> >
> >
> https://github.com/apache/pulsar/blob/9f599c9572e5d9b1f15efa6e895e7eb29b284e57/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L130-L133
> >
> > ## Goal
> >
> > Add new commands `STOP_PRODUCER/STOP_CONSUMER`, When the client receives
> the command, it only closes without reconnecting.
> >
> > ## API Changes
> >
> > 1. Add command to `BaseCommand`
> > ```java
> > message BaseCommand {
> >enum Type {
> >// ...
> >STOP_PRODUCER = 64;
> >STOP_CONSUMER = 65
> >}
> > }
> > ```
> >
>
>


[GitHub] [pulsar-manager] sourabhaggrawal opened a new pull request #437: Set read/write access to resource for each role

2021-12-28 Thread GitBox


sourabhaggrawal opened a new pull request #437:
URL: https://github.com/apache/pulsar-manager/pull/437


   ### Motivation
   Assign resource level read/write access to each role of a user. This way we 
can assign multiple resources to a user with read/write permission. 
   
   * A user may exist with multiple resource access where he/she is allowed to 
only read a particular resource while for other resource it can make changes.*
   
   ### Modifications
   
   * 1. Added access field at role level which can have read or write as value.
   * 2. Added tenants field in resourceType list and will list available 
tenants in resource field while creating a role
   * 3. Added access map (Map>) in /usersInfo 
response, each key is the resourceType and value holds another map with 
resourcename as value and value as read/write.
   * 4. Added hasPermissionToResource method in module/users.js which is called 
for each actionable element when ui is rendered to decide to show/hide the 
resource based on access. 
   * 5. Changes in .vue file to call the hasPermissionToResource method to 
decide if user has access to a particular element.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the `./gradlew build` checks.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




Re: [VOTE] PIP 131: Resolve produce chunk messages failed when topic level maxMessageSize is set

2021-12-28 Thread Zike Yang
+1

Thanks,
Zike

On Wed, Dec 29, 2021 at 11:26 AM PengHui Li  wrote:
>
> +1
>
> Thanks,
> Penghui
>
> On Wed, Dec 29, 2021 at 10:29 AM Haiting Jiang 
> wrote:
>
> > This is the voting thread for PIP-131. It will stay open for at least 48h.
> >
> > https://github.com/apache/pulsar/issues/13544
> >
> > The discussion thread is
> > https://lists.apache.org/thread/c63d9s73j9x1m3dkqr3r38gyp8s7cwzf
> >
> > ## Motivation
> >
> > Currently, chunk messages producing fails if topic level maxMessageSize is
> > set [1]. The root cause of this issue is because chunk message is using
> > broker level maxMessageSize as chunk size. And topic level maxMessageSize
> > is always <= broker level maxMessageSize. So once it is set, the on-going
> > chunk message producing fails.
> >
> > ## Goal
> >
> > Resolve topic level maxMessageSize compatibility issue with chunking
> > messages.
> >
> > ## Implementation
> >
> > Current best solution would be just skipping topic level maxMessageSize
> > check in
> > org.apache.pulsar.broker.service.AbstractTopic#isExceedMaximumMessageSize.
> > Topic level maxMessageSize is introduced in [2], for the purpose of
> > "easier to plan resource quotas for client allocation". And IMO this change
> > will not bring further complex into this.
> >
> > ## Reject Alternatives
> >
> > Add a client side topic level maxMessageSize and keep it synced with
> > broker.
> >
> > Required changes:
> > - [client] Add a new field
> > org.apache.pulsar.client.impl.ProducerBase#maxMessageSize to store this
> > client side topic level maxMessageSize.
> > - [PulsarApi.proto] Add a new field maxMessageSize in the
> > CommandProducerSuccess for the initial value of ProducerBase#maxMessageSize
> > - [PulsarApi.proto] Add a new Command like
> > CommandUpdateClientPolicy{producerId, maxMessageSize} to update
> > ProducerBase#maxMessageSize when topic level maxMessageSize is updated.
> > Further more, some other data consistency issues need be handled very
> > carefully when maxMessageSize is updated.
> > This alternative is complex but can also solve other topic level
> > maxMessageSize issue [3] when batching is enabled (non-batching case is
> > solved with PR [4]).
> >
> > [1] https://github.com/apache/pulsar/issues/13360
> > [2] https://github.com/apache/pulsar/pull/8732
> > [3] https://github.com/apache/pulsar/issues/12958
> > [4] https://github.com/apache/pulsar/pull/13147
> >
> > Thanks,
> > Haiting Jiang
> >



-- 
Zike Yang


Re: [DISCUSS] PIP-124: Pulsar Client Shared State API

2021-12-28 Thread Enrico Olivelli
Matteo,

Il Mer 29 Dic 2021, 02:57 Matteo Merli  ha scritto:

> > * Add an API to the Java client that makes it easier to maintain a
> consistent Share State between instances of an application.
> > * Provide some ready to use recipes, like a simple key-value store
> >
> > It is not a goal to implement a Pulsar backed Database system
>
> While the first use case for Pulsar was indeed to be the
> messaging/replication platform for a distributed database, and it has
> been working in production for many years, I'm not convinced to add
> this level of API as part of the Pulsar client API.
>
> Pulsar API has been designed to be high-level and easy to use (and
> reason about), with in mind the use cases of application developers. I
> don't think that a "storage" level API fits well with the rest of
> abstractions.
>
> > public interface PulsarMap extends AutoCloseable {
> > ..
> >  CompletableFuture put(K key, V value)
>
> If all the logic is implemented in the client side, when there are
> multiple clients sharing the same, how can any of them mutate the
> state, since we actually enforce that there is a single exclusive
> producer? Would a user get an error if there's already a different
> client writing?
>
> My impression is that, while looking convenient, a shared Map
> interface is not the best abstraction for either case:
>  * If you're actually building a DB, you will definitely need access
> to the log itself rather than a Map interface
>  * If you want to share some state across multiple processes without
> using a DB, there are many tricky API, consistency and semantic
> problems to solve, many of which are just pushed down to the
> application which will need to be aware and understand them. At that
> point, I would seriously recommend using a DB, or if the question is:
> "I don't want to use an additional external system", then to use the
> BK TableService component.
>

This is usually not a option because the BK TableService does not support
well multi tenancy and also the application would need to connect to the
Bookies (think about configuration, security...)


>
> I think this feature should be best viewed as a recipe, as it doesn't
> depend on or benefits from any internal broker support. If there are
> enough interest and concrete use cases it can be then included later.
>

My initial proposal was to push this to Pulsar Adapters.
I changed the proposal before sending the PIP because I think it very
useful for Protocol Handlers  and in Pulsar IO connectors.

I am totally fine to add this to pulsar-adapters, but I want to see this in
the Pulsar repo and released as part of an official Pulsar recipe.

@Matteo does this sound like a good option to you?

Otherwise we miss the possibility to make it easier for Pulsar users to
leverage this power of Pulsar.

In Pravega you have State Synchronizers and they are a great foundational
API and we are missing something like that in Pulsar.

Enrico



> --
> Matteo Merli
> 
>
> On Fri, Dec 24, 2021 at 1:53 AM Enrico Olivelli 
> wrote:
> >
> > Hello everyone,
> > I want to start a discussion about PIP-124 Pulsar Client Shared  State
> API
> >
> > This is the PIP document
> > https://github.com/apache/pulsar/issues/13490
> >
> > This is a demo implementation (a proof-of-concept):
> > https://github.com/eolivelli/pulsar-shared-state-manager
> >
> > Please take a look and share your thoughts
> >
> > I believe that this will unlock the potential of the Exclusive
> > Producer and it will also make easier the life of many developers who
> > are using Pulsar and need some API to share configuration, metadata,
> > or any simple key-value data structure without adding a Database or
> > other components to their library, Pulsar IO connector or Pulsar
> > Protocol Handler.
> >
> > Thanks
> > Enrico
>