Re: [VOTE] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread Yunze Xu
+1 (binding)

Thanks,
Yunze

On Tue, Mar 21, 2023 at 2:59 PM 丛搏  wrote:
>
> +1 (binding)
>
> Thanks,
> Bo
>
> Yubiao Feng  于2023年3月20日周一 17:11写道:
> >
> > Hi Pulsar Community
> >
> > This thread is to start the vote for PIP 259.
> >
> > Discussion: https://lists.apache.org/thread/f11cld5cbc8sodhgvs5s28lw8nxsr9dc
> > Issue: https://github.com/apache/pulsar/issues/19826
> > Implementation: https://github.com/apache/pulsar/pull/19514
> >
> >
> > Voting will stay open for at least 48h.
> >
> > Thanks,
> > Yubiao Feng


Re: [DISCUSS] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread Zike Yang
Based on the private discussion with Yubiao, here is the current update:

Currently, we can get the init parameter `requestBufferSize ` from the
ServletConfig. We need to investigate this configuring method to avoid
the potential conflict issue. And we may not need to expose
`httpClientRequestBufferSize` in the proxy configuration.

@Yubiao, Please correct me or provide more context if I am wrong.

Thanks,
Zike Yang




Zike Yang

On Tue, Mar 21, 2023 at 2:50 PM Yubiao Feng
 wrote:
>
> Hi Zike
>
> > Exposing `httpClientRequestBufferSize` to help users improve the
> > forwarding performance.
>
> It's just an added benefit.
>
> The primary goal is to solve the long HTTP URL.
> Since Pulsar Proxy forwards the request to Pulsar Server, it will throw
> BufferOverflowException if the length of the URL is lang than 4096( default
> value of  `httpClientRequestBufferSize` ), because the URI and all the HTTP
> headers will share the same ByteBuf, such as this:
>
> There has an HTTP request like this:
>
> ```
> GET /admin/v2/public/default/tp/stats HTTP/1.1
> Host: 127.0.0.1
> Accept: application/json
> 
> ```
>
> The internal client of Pulsar Proxy will work like this:
>
> ```
> ByteBuf buf = allocate( config.httpClientRequestBufferSize )
> buf.write("GET /admin/v2/public/default/tp/stats HTTP/1.1");
> buf.write("Host: 127.0.0.1");
> buf.write("Accept: application/json")
> ...
> buf.flush()
> ```
>
> If the data is larger than the ByteBuf, we will get a
> BufferOverflowException, and users will get an error with the reason
> "Request header too large."
>
> https://github.com/eclipse/jetty.project/blob/574ad3b4daacf0d992f40ab780f2425ecbbac7bb/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpSenderOverHTTP.java#L178-L183
>
>
> Thanks
> Yubiao Feng
>
> On Tue, Mar 21, 2023 at 11:16 AM Zike Yang  wrote:
>
> > Hi, Yubiao
> >
> > Thanks for your explanation. So from my understanding, there are
> > actually two different goals in this PIP:
> > * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> > https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> > * Exposing `httpClientRequestBufferSize` to help users improve the
> > forwarding performance.
> >
> > I'm +1 if my understanding is correct.
> >
> > Thanks,
> > Zike Yang
> >
> > On Mon, Mar 20, 2023 at 10:07 PM Yubiao Feng
> >  wrote:
> > >
> > > Hi Zike
> > >
> > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > issue. We can set the buffer size in the proxy code based on the value
> > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > expose `httpClientRequestBufferSize`?
> > >
> > > 1. These two configurations work on two very different components: The
> > > config "httpMaxRequestHeaderSize" is used for the internal server of
> > > Pulsar-Proxy, and the config "httpClientRequestBufferSize" is used for
> > the
> > > internal HTTP client of the Pulsar-Proxy.
> > >
> > > 2. Users can modify the configure `httpClientRequestBufferSize` to adjust
> > > the buffer of proxy forwarding requests to improve forwarding
> > performance.
> > >
> > > So it's better to have two separate configurations
> > >
> > > Thanks
> > > Yubiao Feng
> > >
> > >
> > >
> > > On Mon, Mar 20, 2023 at 6:36 PM Zike Yang  wrote:
> > >
> > > > > private int httpClientRequestBufferSize =
> > httpMaxRequestHeaderSize;
> > > >
> > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > issue. We can set the buffer size in the proxy code based on the value
> > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > expose `httpClientRequestBufferSize`?
> > > >
> > > > Thanks,
> > > > Zike Yang
> > > >
> > > > On Fri, Mar 17, 2023 at 7:31 PM 丛搏  wrote:
> > > > >
> > > > > +1
> > > > >
> > > > > Hi, Yubiao :
> > > > >
> > > > > Thanks for your explanation. That makes sense to me.
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > > >
> > > > > Yubiao Feng  于2023年3月17日周五
> > 16:29写道:
> > > > > >
> > > > > > Hi Bo
> > > > > >
> > > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > > proxy, can you explain in detail?
> > > > > >
> > > > > > Since The Pulsar-Proxy uses the tool `jetty-client` to forward HTTP
> > > > > > requests from users to The Pulsar-Broker, if the proxy receives a
> > > > request
> > > > > > like this:
> > > > > >
> > > > > > ```
> > > > > > GET /admin/v2/public/default/tp.long..long/stats
> > HTTP/1.1
> > > > > > ```
> > > > > >
> > > > > > The internal client with forward this request like this:
> > > > > >
> > > > > > ```
> > > > > > ByteBuf buf = allocate( config.httpClientRequestBufferSize )
> > > > > > buf.write(requestLine);  // (Highlight) we will get
> > > > > > a BufferOverflowException if the request line is too long.
> > > > > > 

Re: [DISCUSS] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread Yubiao Feng
Hi Zike

> Thanks for your explanation. So from my understanding, there are
> actually two different goals in this PIP:
> * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> * Exposing `httpClientRequestBufferSize` to help users improve the
> forwarding performance.

To keep the goal clear, we can only set `httpClientRequestBufferSize`
internally without giving the user a way to change it.


Thanks
Yubiao Feng


On Tue, Mar 21, 2023 at 11:16 AM Zike Yang  wrote:

> Hi, Yubiao
>
> Thanks for your explanation. So from my understanding, there are
> actually two different goals in this PIP:
> * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> * Exposing `httpClientRequestBufferSize` to help users improve the
> forwarding performance.
>
> I'm +1 if my understanding is correct.
>
> Thanks,
> Zike Yang
>
> On Mon, Mar 20, 2023 at 10:07 PM Yubiao Feng
>  wrote:
> >
> > Hi Zike
> >
> > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > issue. We can set the buffer size in the proxy code based on the value
> > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > expose `httpClientRequestBufferSize`?
> >
> > 1. These two configurations work on two very different components: The
> > config "httpMaxRequestHeaderSize" is used for the internal server of
> > Pulsar-Proxy, and the config "httpClientRequestBufferSize" is used for
> the
> > internal HTTP client of the Pulsar-Proxy.
> >
> > 2. Users can modify the configure `httpClientRequestBufferSize` to adjust
> > the buffer of proxy forwarding requests to improve forwarding
> performance.
> >
> > So it's better to have two separate configurations
> >
> > Thanks
> > Yubiao Feng
> >
> >
> >
> > On Mon, Mar 20, 2023 at 6:36 PM Zike Yang  wrote:
> >
> > > > private int httpClientRequestBufferSize =
> httpMaxRequestHeaderSize;
> > >
> > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > issue. We can set the buffer size in the proxy code based on the value
> > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > expose `httpClientRequestBufferSize`?
> > >
> > > Thanks,
> > > Zike Yang
> > >
> > > On Fri, Mar 17, 2023 at 7:31 PM 丛搏  wrote:
> > > >
> > > > +1
> > > >
> > > > Hi, Yubiao :
> > > >
> > > > Thanks for your explanation. That makes sense to me.
> > > >
> > > > Thanks,
> > > > Bo
> > > >
> > > >
> > > > Yubiao Feng  于2023年3月17日周五
> 16:29写道:
> > > > >
> > > > > Hi Bo
> > > > >
> > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > proxy, can you explain in detail?
> > > > >
> > > > > Since The Pulsar-Proxy uses the tool `jetty-client` to forward HTTP
> > > > > requests from users to The Pulsar-Broker, if the proxy receives a
> > > request
> > > > > like this:
> > > > >
> > > > > ```
> > > > > GET /admin/v2/public/default/tp.long..long/stats
> HTTP/1.1
> > > > > ```
> > > > >
> > > > > The internal client with forward this request like this:
> > > > >
> > > > > ```
> > > > > ByteBuf buf = allocate( config.httpClientRequestBufferSize )
> > > > > buf.write(requestLine);  // (Highlight) we will get
> > > > > a BufferOverflowException if the request line is too long.
> > > > > ```
> > > > >
> > > > > Therefore, in addition to ensuring that the proxy server can
> receive a
> > > long
> > > > > request line, the internal client must also process a long request
> > > line.
> > > > > And this problem can be solved by making configuration
> > > > > `httpClientRequestBufferSize` configurable.
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > >
> > > > > On Thu, Mar 16, 2023 at 8:12 PM 丛搏  wrote:
> > > > >
> > > > > > hi yubiao :
> > > > > >
> > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > proxy, can you explain in detail?
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > > > Yubiao Feng  于2023年3月16日周四
> > > 00:11写道:
> > > > > >
> > > > > > >
> > > > > > > Hi community
> > > > > > >
> > > > > > > I am starting a DISCUSS for "PIP-259: Make the config
> > > > > > > httpMaxRequestHeaderSize of the pulsar web server
> configurable".
> > > > > > >
> > > > > > > ### Motivation
> > > > > > >
> > > > > > > We have two ways to manage pulsar's resources:
> > > > > > > - By client API (Can manage some resources, such as `create
> topic`,
> > > > > > `create
> > > > > > > subscriber`, and so on)
> > > > > > > - By admin API (Can manage all the resources)
> > > > > > >
> > > > > > > The `client API` has no limit on the request length. And the
> > > `admin API`
> > > > > > > has a limit on the request length(such as HTTP request line and
> > > HTTP
> > > > > 

Re: [DISCUSS] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread Zike Yang
> To keep the goal clear, we can only set `httpClientRequestBufferSize`
internally without giving the user a way to change it.

+1.

Thanks,
Zike Yang

On Tue, Mar 21, 2023 at 4:08 PM Yubiao Feng
 wrote:
>
> Hi Zike
>
> > Thanks for your explanation. So from my understanding, there are
> > actually two different goals in this PIP:
> > * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> > https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> > * Exposing `httpClientRequestBufferSize` to help users improve the
> > forwarding performance.
>
> To keep the goal clear, we can only set `httpClientRequestBufferSize`
> internally without giving the user a way to change it.
>
>
> Thanks
> Yubiao Feng
>
>
> On Tue, Mar 21, 2023 at 11:16 AM Zike Yang  wrote:
>
> > Hi, Yubiao
> >
> > Thanks for your explanation. So from my understanding, there are
> > actually two different goals in this PIP:
> > * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> > https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> > * Exposing `httpClientRequestBufferSize` to help users improve the
> > forwarding performance.
> >
> > I'm +1 if my understanding is correct.
> >
> > Thanks,
> > Zike Yang
> >
> > On Mon, Mar 20, 2023 at 10:07 PM Yubiao Feng
> >  wrote:
> > >
> > > Hi Zike
> > >
> > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > issue. We can set the buffer size in the proxy code based on the value
> > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > expose `httpClientRequestBufferSize`?
> > >
> > > 1. These two configurations work on two very different components: The
> > > config "httpMaxRequestHeaderSize" is used for the internal server of
> > > Pulsar-Proxy, and the config "httpClientRequestBufferSize" is used for
> > the
> > > internal HTTP client of the Pulsar-Proxy.
> > >
> > > 2. Users can modify the configure `httpClientRequestBufferSize` to adjust
> > > the buffer of proxy forwarding requests to improve forwarding
> > performance.
> > >
> > > So it's better to have two separate configurations
> > >
> > > Thanks
> > > Yubiao Feng
> > >
> > >
> > >
> > > On Mon, Mar 20, 2023 at 6:36 PM Zike Yang  wrote:
> > >
> > > > > private int httpClientRequestBufferSize =
> > httpMaxRequestHeaderSize;
> > > >
> > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > issue. We can set the buffer size in the proxy code based on the value
> > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > expose `httpClientRequestBufferSize`?
> > > >
> > > > Thanks,
> > > > Zike Yang
> > > >
> > > > On Fri, Mar 17, 2023 at 7:31 PM 丛搏  wrote:
> > > > >
> > > > > +1
> > > > >
> > > > > Hi, Yubiao :
> > > > >
> > > > > Thanks for your explanation. That makes sense to me.
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > > >
> > > > > Yubiao Feng  于2023年3月17日周五
> > 16:29写道:
> > > > > >
> > > > > > Hi Bo
> > > > > >
> > > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > > proxy, can you explain in detail?
> > > > > >
> > > > > > Since The Pulsar-Proxy uses the tool `jetty-client` to forward HTTP
> > > > > > requests from users to The Pulsar-Broker, if the proxy receives a
> > > > request
> > > > > > like this:
> > > > > >
> > > > > > ```
> > > > > > GET /admin/v2/public/default/tp.long..long/stats
> > HTTP/1.1
> > > > > > ```
> > > > > >
> > > > > > The internal client with forward this request like this:
> > > > > >
> > > > > > ```
> > > > > > ByteBuf buf = allocate( config.httpClientRequestBufferSize )
> > > > > > buf.write(requestLine);  // (Highlight) we will get
> > > > > > a BufferOverflowException if the request line is too long.
> > > > > > ```
> > > > > >
> > > > > > Therefore, in addition to ensuring that the proxy server can
> > receive a
> > > > long
> > > > > > request line, the internal client must also process a long request
> > > > line.
> > > > > > And this problem can be solved by making configuration
> > > > > > `httpClientRequestBufferSize` configurable.
> > > > > >
> > > > > >
> > > > > > Thanks
> > > > > > Yubiao Feng
> > > > > >
> > > > > >
> > > > > > On Thu, Mar 16, 2023 at 8:12 PM 丛搏  wrote:
> > > > > >
> > > > > > > hi yubiao :
> > > > > > >
> > > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > > proxy, can you explain in detail?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bo
> > > > > > >
> > > > > > > Yubiao Feng  于2023年3月16日周四
> > > > 00:11写道:
> > > > > > >
> > > > > > > >
> > > > > > > > Hi community
> > > > > > > >
> > > > > > > > I am starting a DISCUSS for "PIP-259: Make the config
> > > > > > > > httpMaxRequestHeaderSize of the pulsar web server
> > configurable".
> > > > > > > >
> > > > > > > > ### M

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Yunze Xu
First, I agree with Yubiao that we can avoid calling the `isDuplicate`
method once this option is enabled.

Then, I'm wondering in which case would users want to disable this
option? What's the disadvantage to disable the option? I think we can
just record the latest position (ledger id, entry id, batch index) of
the message received if the subscription type is Exclusive or
Failover.

Is there any breaking change if we just apply this filter without
adding a configuration option?

Thanks,
Yunze

On Tue, Mar 21, 2023 at 2:26 PM 丛搏  wrote:
>
> Hi, Michael
>
> Michael Marshall  于2023年3月21日周二 13:03写道:
> >
> > This is a great problem to improve.
> >
> > What if we instead expand the CommandSubscribe [0] protocol message
> > with a new field to represent the client's desired read position? This
> > way, the client can tell the second broker where to start sending
> > messages, and there is no need to send the messages twice.
> >
> > I like the protocol expansion because it saves on unnecessary network
> > transfer in several places and because it will be more straightforward
> > for clients in other languages to implement.
> >
> > What do you think?
> if we add the new field in CommandSubscribe, we should ensure
> the synchronization between consumer reconnection and user
> calling receive and redeliverUnack method. it will affect the performance
> of receive. expose synchronization to hot paths it not a good idea.
> Although the message is re-delivered twice, I don't think it
> will cause too much performance loss.
>
> This filtering is rigorous, and there cannot be some race condition problems
> because it involves transactions. I want it to be simple and efficient,
> and I don't want it to become complicated and difficult to maintain.
>
> Of course, if the failover and exclusive consumers are changed to pull mode,
> I believe that the change protocol is a very good idea. But at present,
> there is obviously no sufficient reason to do so.
>
> Thanks,
> Bo
>
> >
> > Thanks,
> > Michael
> >
> > [0] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> >
> >
> > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng  
> > wrote:
> > >
> > > Hi Congbo,
> > > I think this is a great idea.
> > > This is more efficient in filtering duplicate messages for a single
> > > consumer.
> > > And maybe more details about implementation should be shown in the 
> > > proposal.
> > >
> > > Best regards,
> > > Xiangying
> > >
> > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > >  wrote:
> > >
> > > > Hi Bo
> > > >
> > > > I think this is a good way to filter messages that the client has 
> > > > received.
> > > >
> > > > And I have two questions:
> > > >
> > > > 1. This is more powerful than the original way
> > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > duplicated messages.
> > > >  Is it possible to turn off the original de-replay logic to improve
> > > > performance after enabling this new feature?
> > > >
> > > > 2. There should be a typo in the article
> > > >
> > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > If we redeliver individual messages, they will be filtered. Because we
> > > > can't clear the record latest message
> > > > >in the consumer when redelivering individual messages. It will make 
> > > > >this
> > > > config unclear, and if every redeliver
> > > > > method changes, it will bring a lot of redundant code, which is 
> > > > > difficult
> > > > to maintain. If there is a need in the
> > > > > future, just support it.
> > > >
> > > > I suppose you want to say not support `redeliverUnacknowledgedMessages`,
> > > > right?
> > > >
> > > >
> > > > Thanks
> > > > Yubiao Feng
> > > >
> > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
> > > >
> > > > > Hi, pulsar community:
> > > > >
> > > > > I started a PIP about `Client consumer filter received messages`.
> > > > >
> > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > >
> > > > > Thanks,
> > > > > Bo
> > > > >
> > > >


Unstable codecov action

2023-03-21 Thread tison
For example
https://github.com/apache/pulsar/actions/runs/4454158774/jobs/7867745340?pr=19842

I'm wondering if anyone cares about the report and if it helps you during
the coding or reviewing process? Now it generates a few of noise but I just
omit the report it gives ;-)

For the issue itself, it seems some artifacts don't retain properly.

Best,
tison.


Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Aloys Zhang
Nice proposal.

I'm interested in a point
>  So when we need to reset the cursor, the client consumer should all be
closed, and then reset the cursor then restart the consumer.

Does this requirement apply to `consumer.seek`?
Because in some scenarios, we need to create consumers first and then seek
a position or timestamp.


Yunze Xu  于2023年3月21日周二 17:19写道:

> First, I agree with Yubiao that we can avoid calling the `isDuplicate`
> method once this option is enabled.
>
> Then, I'm wondering in which case would users want to disable this
> option? What's the disadvantage to disable the option? I think we can
> just record the latest position (ledger id, entry id, batch index) of
> the message received if the subscription type is Exclusive or
> Failover.
>
> Is there any breaking change if we just apply this filter without
> adding a configuration option?
>
> Thanks,
> Yunze
>
> On Tue, Mar 21, 2023 at 2:26 PM 丛搏  wrote:
> >
> > Hi, Michael
> >
> > Michael Marshall  于2023年3月21日周二 13:03写道:
> > >
> > > This is a great problem to improve.
> > >
> > > What if we instead expand the CommandSubscribe [0] protocol message
> > > with a new field to represent the client's desired read position? This
> > > way, the client can tell the second broker where to start sending
> > > messages, and there is no need to send the messages twice.
> > >
> > > I like the protocol expansion because it saves on unnecessary network
> > > transfer in several places and because it will be more straightforward
> > > for clients in other languages to implement.
> > >
> > > What do you think?
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
> > Although the message is re-delivered twice, I don't think it
> > will cause too much performance loss.
> >
> > This filtering is rigorous, and there cannot be some race condition
> problems
> > because it involves transactions. I want it to be simple and efficient,
> > and I don't want it to become complicated and difficult to maintain.
> >
> > Of course, if the failover and exclusive consumers are changed to pull
> mode,
> > I believe that the change protocol is a very good idea. But at present,
> > there is obviously no sufficient reason to do so.
> >
> > Thanks,
> > Bo
> >
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0]
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > >
> > >
> > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng 
> wrote:
> > > >
> > > > Hi Congbo,
> > > > I think this is a great idea.
> > > > This is more efficient in filtering duplicate messages for a single
> > > > consumer.
> > > > And maybe more details about implementation should be shown in the
> proposal.
> > > >
> > > > Best regards,
> > > > Xiangying
> > > >
> > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > >  wrote:
> > > >
> > > > > Hi Bo
> > > > >
> > > > > I think this is a good way to filter messages that the client has
> received.
> > > > >
> > > > > And I have two questions:
> > > > >
> > > > > 1. This is more powerful than the original way
> > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > duplicated messages.
> > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > performance after enabling this new feature?
> > > > >
> > > > > 2. There should be a typo in the article
> > > > >
> > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > If we redeliver individual messages, they will be filtered.
> Because we
> > > > > can't clear the record latest message
> > > > > >in the consumer when redelivering individual messages. It will
> make this
> > > > > config unclear, and if every redeliver
> > > > > > method changes, it will bring a lot of redundant code, which is
> difficult
> > > > > to maintain. If there is a need in the
> > > > > > future, just support it.
> > > > >
> > > > > I suppose you want to say not support
> `redeliverUnacknowledgedMessages`,
> > > > > right?
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > >
>


Re: Implement canonical URLs in Pulsar docs for SEO

2023-03-21 Thread tison
I make a patch for adding a canonical link in the head metadata:
https://github.com/apache/pulsar-site/pull/481

Welcome to leave a comment.

Best,
tison.


Jun Ma  于2023年3月15日周三 17:11写道:

> Hi, everyone,
>
> When researching tagging metadata to Pulsar docs for SEO, I found this
> issue that
> we don't have a strategy/implementation to always return the latest stable
> version
> of Pulsar docs in the Google search results [1], leading to confusion and
> bad UX.
>
> After searching existing issues, I found the same issue was previously
> reported
> by @visortelle last year, and he also provided some insights and
> references in this
> comment [2].
>
> I think this is the biggest gap we need to fix regarding SEO. So I'm
> writing this email
> to ask - if any SEO experts in our community know about how to implement
> canonical
> URLs globally within the Docusaurus framework or any other alternatives?
> It would be
> much more efficient and maintainable than configuring each markdown file.
>
> I'm looking forward to hearing your thoughts and ideas.
>
>
> [1]: https://github.com/apache/pulsar/issues/18190#issuecomment-1469269560
> [2]: https://github.com/apache/pulsar/issues/18190#issuecomment-1295281487
>
>
> Cheers,
> Jun
>
>


Re: Moving PIP to status accepted - how?

2023-03-21 Thread Asaf Mesika
Here is my voting thread:

https://lists.apache.org/thread/ph25f3p405ky78w91r08hd8f0jmkdwh0

Thanks!

> On 21 Mar 2023, at 1:47, tison  wrote:
> 
> FWIW - Anyone can request edit permission on ASF Confluence space, but the
> wiki page permission is more tricky, and it's more unfriendly for suggest
> changes (pull request). GitHub also lowers the priority of this function.
> 
> I may prefer to follow the https://github.com/rust-lang/rfcs pattern but
> simply integrate it with our site repo. But that needs to work on a
> proposal and consensus.
> 
> Best,
> tison.
> 
> 
> tison  于2023年3月21日周二 07:41写道:
> 
>> Hi Asaf,
>> 
>> Currently, you can ping me with a vote result email link, and I'm glad to
>> do you a favor.
>> 
>> To keep the PIP up-to-date, it's another topic that we enhance the process
>> a bit, like
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals.
>> The issue here is that the wiki page is low frequency to be read, so it's
>> easy to become outdated.
>> 
>> Best,
>> tison.
>> 
>> 
>> Asaf Mesika  于2023年3月21日周二 05:08写道:
>> 
>>> Hi,
>>> 
>>> Once the vote of my PIP concluded, I don't understand where is
>>> it registered by PMC members that the status of the PIP was accepted?
>>> 
>>> This wiki page seems not updated:
>>> https://github.com/apache/pulsar/wiki#accepted
>>> 
>>> In the PIP process
>>>  it
>>> says:
>>> 
>>> When the vote is closed, if the outcome is positive, the state of the
>>> proposal is updated,
>>> 
>>> Who can help with this?
>>> 
>>> Thanks,
>>> 
>>> Asaf
>>> 
>> 



Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread 丛搏
Hi, Yunze:

Yunze Xu  于2023年3月21日周二 17:19写道:
>
>
> Is there any breaking change if we just apply this filter without
> adding a configuration option?

If not add this configuration, the Pulsar Admin reset cursor will cause
the wrong behavior. It will filter the messages which have been reset.

As described in Compatibility in PIP. Client consumer doesn't know
Pulsar Admin reset cursor.
>
> Thanks,
> Yunze
>
> On Tue, Mar 21, 2023 at 2:26 PM 丛搏  wrote:
> >
> > Hi, Michael
> >
> > Michael Marshall  于2023年3月21日周二 13:03写道:
> > >
> > > This is a great problem to improve.
> > >
> > > What if we instead expand the CommandSubscribe [0] protocol message
> > > with a new field to represent the client's desired read position? This
> > > way, the client can tell the second broker where to start sending
> > > messages, and there is no need to send the messages twice.
> > >
> > > I like the protocol expansion because it saves on unnecessary network
> > > transfer in several places and because it will be more straightforward
> > > for clients in other languages to implement.
> > >
> > > What do you think?
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
> > Although the message is re-delivered twice, I don't think it
> > will cause too much performance loss.
> >
> > This filtering is rigorous, and there cannot be some race condition problems
> > because it involves transactions. I want it to be simple and efficient,
> > and I don't want it to become complicated and difficult to maintain.
> >
> > Of course, if the failover and exclusive consumers are changed to pull mode,
> > I believe that the change protocol is a very good idea. But at present,
> > there is obviously no sufficient reason to do so.
> >
> > Thanks,
> > Bo
> >
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > >
> > >
> > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng  
> > > wrote:
> > > >
> > > > Hi Congbo,
> > > > I think this is a great idea.
> > > > This is more efficient in filtering duplicate messages for a single
> > > > consumer.
> > > > And maybe more details about implementation should be shown in the 
> > > > proposal.
> > > >
> > > > Best regards,
> > > > Xiangying
> > > >
> > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > >  wrote:
> > > >
> > > > > Hi Bo
> > > > >
> > > > > I think this is a good way to filter messages that the client has 
> > > > > received.
> > > > >
> > > > > And I have two questions:
> > > > >
> > > > > 1. This is more powerful than the original way
> > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > duplicated messages.
> > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > performance after enabling this new feature?
> > > > >
> > > > > 2. There should be a typo in the article
> > > > >
> > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > If we redeliver individual messages, they will be filtered. Because 
> > > > > > we
> > > > > can't clear the record latest message
> > > > > >in the consumer when redelivering individual messages. It will make 
> > > > > >this
> > > > > config unclear, and if every redeliver
> > > > > > method changes, it will bring a lot of redundant code, which is 
> > > > > > difficult
> > > > > to maintain. If there is a need in the
> > > > > > future, just support it.
> > > > >
> > > > > I suppose you want to say not support 
> > > > > `redeliverUnacknowledgedMessages`,
> > > > > right?
> > > > >
> > > > >
> > > > > Thanks
> > > > > Yubiao Feng
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
> > > > >
> > > > > > Hi, pulsar community:
> > > > > >
> > > > > > I started a PIP about `Client consumer filter received messages`.
> > > > > >
> > > > > > PIP: https://github.com/apache/pulsar/issues/19864
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > >


Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread 丛搏
Hi, Aloys:

Yes, it will work with `consumer.seek()`.
Sorry, I missed this, I will add this description to the PIP.

But the current seek method has some problems, the detail in
https://lists.apache.org/thread/97o9t4ltkds5pfq41l9xbbd31t41qm8w,
I am not sure, does it make sense to support seek method in this PIP.

Thanks,
Bo

Aloys Zhang  于2023年3月21日周二 19:08写道:
>
> Nice proposal.
>
> I'm interested in a point
> >  So when we need to reset the cursor, the client consumer should all be
> closed, and then reset the cursor then restart the consumer.
>
> Does this requirement apply to `consumer.seek`?
> Because in some scenarios, we need to create consumers first and then seek
> a position or timestamp.
>
>
> Yunze Xu  于2023年3月21日周二 17:19写道:
>
> > First, I agree with Yubiao that we can avoid calling the `isDuplicate`
> > method once this option is enabled.
> >
> > Then, I'm wondering in which case would users want to disable this
> > option? What's the disadvantage to disable the option? I think we can
> > just record the latest position (ledger id, entry id, batch index) of
> > the message received if the subscription type is Exclusive or
> > Failover.
> >
> > Is there any breaking change if we just apply this filter without
> > adding a configuration option?
> >
> > Thanks,
> > Yunze
> >
> > On Tue, Mar 21, 2023 at 2:26 PM 丛搏  wrote:
> > >
> > > Hi, Michael
> > >
> > > Michael Marshall  于2023年3月21日周二 13:03写道:
> > > >
> > > > This is a great problem to improve.
> > > >
> > > > What if we instead expand the CommandSubscribe [0] protocol message
> > > > with a new field to represent the client's desired read position? This
> > > > way, the client can tell the second broker where to start sending
> > > > messages, and there is no need to send the messages twice.
> > > >
> > > > I like the protocol expansion because it saves on unnecessary network
> > > > transfer in several places and because it will be more straightforward
> > > > for clients in other languages to implement.
> > > >
> > > > What do you think?
> > > if we add the new field in CommandSubscribe, we should ensure
> > > the synchronization between consumer reconnection and user
> > > calling receive and redeliverUnack method. it will affect the performance
> > > of receive. expose synchronization to hot paths it not a good idea.
> > > Although the message is re-delivered twice, I don't think it
> > > will cause too much performance loss.
> > >
> > > This filtering is rigorous, and there cannot be some race condition
> > problems
> > > because it involves transactions. I want it to be simple and efficient,
> > > and I don't want it to become complicated and difficult to maintain.
> > >
> > > Of course, if the failover and exclusive consumers are changed to pull
> > mode,
> > > I believe that the change protocol is a very good idea. But at present,
> > > there is obviously no sufficient reason to do so.
> > >
> > > Thanks,
> > > Bo
> > >
> > > >
> > > > Thanks,
> > > > Michael
> > > >
> > > > [0]
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-common/src/main/proto/PulsarApi.proto#L339-L400
> > > >
> > > >
> > > > On Mon, Mar 20, 2023 at 10:56 AM Xiangying Meng 
> > wrote:
> > > > >
> > > > > Hi Congbo,
> > > > > I think this is a great idea.
> > > > > This is more efficient in filtering duplicate messages for a single
> > > > > consumer.
> > > > > And maybe more details about implementation should be shown in the
> > proposal.
> > > > >
> > > > > Best regards,
> > > > > Xiangying
> > > > >
> > > > > On Mon, Mar 20, 2023 at 10:53 PM Yubiao Feng
> > > > >  wrote:
> > > > >
> > > > > > Hi Bo
> > > > > >
> > > > > > I think this is a good way to filter messages that the client has
> > received.
> > > > > >
> > > > > > And I have two questions:
> > > > > >
> > > > > > 1. This is more powerful than the original way
> > > > > > (`acknowledgmentsGroupingTracker.isDuplicate(msgId)) to filter out
> > > > > > duplicated messages.
> > > > > >  Is it possible to turn off the original de-replay logic to improve
> > > > > > performance after enabling this new feature?
> > > > > >
> > > > > > 2. There should be a typo in the article
> > > > > >
> > > > > > > ## Only support Consumer#redeliverUnacknowledgedMessages()
> > > > > > > If we redeliver individual messages, they will be filtered.
> > Because we
> > > > > > can't clear the record latest message
> > > > > > >in the consumer when redelivering individual messages. It will
> > make this
> > > > > > config unclear, and if every redeliver
> > > > > > > method changes, it will bring a lot of redundant code, which is
> > difficult
> > > > > > to maintain. If there is a need in the
> > > > > > > future, just support it.
> > > > > >
> > > > > > I suppose you want to say not support
> > `redeliverUnacknowledgedMessages`,
> > > > > > right?
> > > > > >
> > > > > >
> > > > > > Thanks
> > > > > > Yubiao Feng
> > > > > >
> > > > > > On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
> > > >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Yubiao Feng
+1

Hi, Bo :

Thanks for your explanation. That makes sense to me.

Thanks,
Yubiao Feng

On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:

> Hi, pulsar community:
>
> I started a PIP about `Client consumer filter received messages`.
>
> PIP: https://github.com/apache/pulsar/issues/19864
>
> Thanks,
> Bo
>


RE: [DISCUSS] PIP-255: Assign topic partitions to bundle by round robin

2023-03-21 Thread Lin Lin



Thanks for joining this discussion

> 1. where is the partition to bundle mapping stored?

We don't need to store the mapping relationship, it can be calculated 
dynamically. The first is the starting bundle, partition-0 is calculated 
directly through consistent hashing. Subsequent partitions are assigned to 
subsequent bundles by round robin

> 2. when upgrade origin logic to the new round robin logic. how the current 
> code distinguish partition assigned by origin logic and the new created topic 
> assign by round robin logic.

This is an incompatible modification, so the entire cluster needs to be 
upgraded, not just a part of the nodes

> 2. can you explain how the re-assignment works (when bundle number change). 
> which component will trigger and do the work ?

When a bundle-split occurs, the bundle unload at the namespace level will be 
triggered. In this namespace, the binding relationship between all partitions 
and the bundle will be re-determined. The re-determined steps are as stated in 
the issue:
1) partition-0 finds the starting bundle through consistent hashing
2) Subsequent partitions are assigned to subsequent bundles by round robin


> 3. If bundle-split is not expected. how many bundle should user set. and do 
> we need disable bundle split we the round robin logic applied.

Now this way does not limit the use of bundle-split, but it will trigger the 
rebinding of partitions under the entire namespace during bundle split, and 
there will be a certain allocation time


Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Michael Marshall
> if we add the new field in CommandSubscribe, we should ensure
> the synchronization between consumer reconnection and user
> calling receive and redeliverUnack method. it will affect the performance
> of receive. expose synchronization to hot paths it not a good idea.

I don't think this is a valid objection. I am pretty sure we already
synchronize in the relevant places in the consumer to solve the exact
race condition you're concerned about: [0] [1].

My proposed operation is to keep track of the latest message id that
the application has seen, and then tell the broker that id when
sending the Subscribe command. We already do similar logic here [2]
[3], but instead of getting the first message id the consumer hasn't
seen, we'll get the latest message id seen.

Regarding performance, the PIP doesn't touch on how it will filter out
messages. What is the planned approach? In my understanding, the
client will keep track of the latest message id that the application
has seen and then will need to compare that message id against every
new mess. As such, it seems like telling the broker where to start
instead of naively checking a filter on every message would be
cheaper.

> As described in Compatibility in PIP. Client consumer doesn't know
> Pulsar Admin reset cursor.

The problem of "the consumer doesn't know" seems like something that
is reasonably within the protocol's responsibilities. In this case, an
event happens on the broker, and the broker can tell the consumer.

> * Consumers should close when the server resets the cursor,
> * when the cursor reset success, and then restart. Otherwise,
> * the consumer will not receive the history messages.

This is introducing a confusing edge case that requires reading a
Javadoc in order to understand. That seems risky to me, and I do not
think we should add such an edge case. A new protocol message would
easily handle it and make it transparent to the application.

Thanks,
Michael

[0] 
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
[1] 
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
[2] 
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
[3] 
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960

On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
 wrote:
>
> +1
>
> Hi, Bo :
>
> Thanks for your explanation. That makes sense to me.
>
> Thanks,
> Yubiao Feng
>
> On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
>
> > Hi, pulsar community:
> >
> > I started a PIP about `Client consumer filter received messages`.
> >
> > PIP: https://github.com/apache/pulsar/issues/19864
> >
> > Thanks,
> > Bo
> >


Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Michael Marshall
One more point. Instead of keeping track of the latest message seen by
the application, the logic in my solution would actually just check
the last message in the `incomingMessages` queue (as in the most
recently added), and use that as the read position in the subscribe
command. If we made this change, we would have to change this code [0]
to not drop the `incomingMessages` queue.

Thanks,
Michael

[0] 
https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795

On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall  wrote:
>
> > if we add the new field in CommandSubscribe, we should ensure
> > the synchronization between consumer reconnection and user
> > calling receive and redeliverUnack method. it will affect the performance
> > of receive. expose synchronization to hot paths it not a good idea.
>
> I don't think this is a valid objection. I am pretty sure we already
> synchronize in the relevant places in the consumer to solve the exact
> race condition you're concerned about: [0] [1].
>
> My proposed operation is to keep track of the latest message id that
> the application has seen, and then tell the broker that id when
> sending the Subscribe command. We already do similar logic here [2]
> [3], but instead of getting the first message id the consumer hasn't
> seen, we'll get the latest message id seen.
>
> Regarding performance, the PIP doesn't touch on how it will filter out
> messages. What is the planned approach? In my understanding, the
> client will keep track of the latest message id that the application
> has seen and then will need to compare that message id against every
> new mess. As such, it seems like telling the broker where to start
> instead of naively checking a filter on every message would be
> cheaper.
>
> > As described in Compatibility in PIP. Client consumer doesn't know
> > Pulsar Admin reset cursor.
>
> The problem of "the consumer doesn't know" seems like something that
> is reasonably within the protocol's responsibilities. In this case, an
> event happens on the broker, and the broker can tell the consumer.
>
> > * Consumers should close when the server resets the cursor,
> > * when the cursor reset success, and then restart. Otherwise,
> > * the consumer will not receive the history messages.
>
> This is introducing a confusing edge case that requires reading a
> Javadoc in order to understand. That seems risky to me, and I do not
> think we should add such an edge case. A new protocol message would
> easily handle it and make it transparent to the application.
>
> Thanks,
> Michael
>
> [0] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L826-L912
> [1] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L1870-L1876
> [2] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> [3] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L922-L960
>
> On Tue, Mar 21, 2023 at 8:58 AM Yubiao Feng
>  wrote:
> >
> > +1
> >
> > Hi, Bo :
> >
> > Thanks for your explanation. That makes sense to me.
> >
> > Thanks,
> > Yubiao Feng
> >
> > On Mon, Mar 20, 2023 at 10:21 PM 丛搏  wrote:
> >
> > > Hi, pulsar community:
> > >
> > > I started a PIP about `Client consumer filter received messages`.
> > >
> > > PIP: https://github.com/apache/pulsar/issues/19864
> > >
> > > Thanks,
> > > Bo
> > >


Re: [DISCUSS] PIP-255: Assign topic partitions to bundle by round robin

2023-03-21 Thread Heesung Sohn
Hi, I see. I have follow-up questions below.

- This appears to be the "round-robin topic-to-bundle mapping" option in
the `fundBundle` function. Is this the only place that needs an update? Can
you list what change is required?

- How do we enable this "round-robin topic-to-bundle mapping option" (by
namespace policy and broker.conf)?

- Can we apply this option to existing namespaces? (what's the admin
operation to enable this option)?

- I assume the "round-robin topic-to-bundle mapping option" works with a
single partitioned topic, because other topics might show different load
per partition. Is this intention? (so users need to ensure not to put other
topics in the namespace, if this option is  configured)

- Some brokers might have more bundles than other brokers. Do we have
different logic for bundle balancing across brokers? or do we rely on the
existing assign/unload/split logic to balance bundles among brokers?

Thanks,
Heesung



On Tue, Mar 21, 2023 at 7:27 AM Lin Lin  wrote:

>
>
> Thanks for joining this discussion
>
> > 1. where is the partition to bundle mapping stored?
>
> We don't need to store the mapping relationship, it can be calculated
> dynamically. The first is the starting bundle, partition-0 is calculated
> directly through consistent hashing. Subsequent partitions are assigned to
> subsequent bundles by round robin
>
> > 2. when upgrade origin logic to the new round robin logic. how the
> current code distinguish partition assigned by origin logic and the new
> created topic assign by round robin logic.
>
> This is an incompatible modification, so the entire cluster needs to be
> upgraded, not just a part of the nodes
>
> > 2. can you explain how the re-assignment works (when bundle number
> change). which component will trigger and do the work ?
>
> When a bundle-split occurs, the bundle unload at the namespace level will
> be triggered. In this namespace, the binding relationship between all
> partitions and the bundle will be re-determined. The re-determined steps
> are as stated in the issue:
> 1) partition-0 finds the starting bundle through consistent hashing
> 2) Subsequent partitions are assigned to subsequent bundles by round robin
>
>
> > 3. If bundle-split is not expected. how many bundle should user set. and
> do we need disable bundle split we the round robin logic applied.
>
> Now this way does not limit the use of bundle-split, but it will trigger
> the rebinding of partitions under the entire namespace during bundle split,
> and there will be a certain allocation time
>


RE: [DISCUSS] PIP-255: Assign topic partitions to bundle by round robin

2023-03-21 Thread Lin Lin
The namespace level bundle-unload can be performed in 
NamespaceService#splitAndOwnBundleOnceAndRetry
A new judgment will be added here.
After splitting the bundle, it should determine whether to unload at the 
namespace level.


On 2023/03/19 09:53:07 lifepuzzlefun wrote:
> I'm interest on the implementation details.
> 
> 
> 1. where is the partition to bundle mapping stored?when upgrade origin logic 
> to the new round robin logic. how the current code distinguish partition 
> assigned by origin logic and the new created topic assign by round robin 
> logic.
> 
> 
> 2. can you explain how the re-assignment works (when bundle number change).  
> which component will trigger and do the work ? 
> 
> 
> 3. If bundle-split is not expected. how many bundle should user set. and do 
> we need disable bundle split we the round robin logic applied.
> 
> 
> 
> 


Re: [DISCUSS] PIP-255: Assign topic partitions to bundle by round robin

2023-03-21 Thread Lin Lin


> This appears to be the "round-robin topic-to-bundle mapping" option in
> the `fundBundle` function. Is this the only place that needs an update? Can
> you list what change is required?

In this PIP, we only discuss topic-to-bundle mapping
Change is required:
1)
When lookup, partitions is assigned to bundle:
Lookup -> NamespaceService#getBrokerServiceUrlAsync -> 
NamespaceService#getBundleAsync ->
NamespaceBundles#findBundle
Consistent hashing is now used to assign partitions to bundle in 
NamespaceBundles#findBundle.
We should add a configuration item partitionAssignerClassName, so that 
different partition assignment algorithms can be dynamically configured.
The existing algorithm will be used as the default 
(partitionAssignerClassName=ConsistentHashingPartitionAssigner)
2)
Implement a new partition assignment class RoundRobinPartitionAssigner. 
New partition assignments will be implemented in this class


> How do we enable this "round-robin topic-to-bundle mapping option" (by
> namespace policy and broker.conf)?

In broker.conf, a new option called `partitionAssignerClassName`

> Can we apply this option to existing namespaces? (what's the admin
> operation to enable this option)?

The cluster must ensure that all nodes use the same algorithm.
Broker-level configuration can be made effective by restarting or admin API
BrokersBase#updateDynamicConfiguration

> I assume the "round-robin topic-to-bundle mapping option" works with a
> single partitioned topic, because other topics might show different load
> per partition. Is this intention? (so users need to ensure not to put other
> topics in the namespace, if this option is configured)

For  single-partition topics, since the starting bundle is determined using a 
consistent hash. 
Therefore,  single-partition topics will spread out to different bundle as much 
as possible.
For high load single-partition topics, current algorithms cannot solve this 
problem. 
This PIP cannot solve this problem as well.
If it just a low load single-partition topic , the impact on the entire bundle 
is very small.
However, in real scenarios, high-load businesses will share the load through 
multiple partitions.

> Some brokers might have more bundles than other brokers. Do we have
> different logic for bundle balancing across brokers? or do we rely on the
> existing assign/unload/split logic to balance bundles among brokers?

In this PIP, we do not involve the mapping between bundles and brokers, 
the existing algorithm works well with this PIP. 
However, we will also contribute our mapping algorithm in the subsequent PIP.
For example: bundles under same namespace can be assigned to broker in a 
round-robin manner.




Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread 丛搏
Hi, Michael:

Michael Marshall  于2023年3月21日周二 23:17写道:

>
> One more point. Instead of keeping track of the latest message seen by
> the application, the logic in my solution would actually just check
> the last message in the `incomingMessages` queue (as in the most
> recently added), and use that as the read position in the subscribe
> command. If we made this change, we would have to change this code [0]
> to not drop the `incomingMessages` queue.

case 1:
What we define the message that the application has seen?
I think it is the[0], when the `incomingMessages` queue is empty,
how do we get the correct `startPosition`?
What I think we should lock the receive logic in [1]
```
synchronized (this) {
message = incomingMessages.take();
messageProcessed(message);
}
```
why do we need to invoke `BlockingQueue.take` and `synchronized` in the
same logic? it's a bad code.

case 2:
If we sub with `startMessageId`, we also should lock any enqueue
logic, like [2] and
check to consumer's current state
```
synchronized (this) {
if (consumer.isConnected) {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on
`incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this
instance was possibly already been released
// and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter ->
limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
}
}
```
case 3:
when we subcommand sends to broker with `startMessageId = 1`, then the
broker push message
has not yet entered `incommingQueue`, the application invokes
redeliver. in this way, we don't
filter messages are correct, right?

These are some cases that I simply thought of, and there must be
others that I haven't thought
of. Are you sure we can handle these problems correctly?

> The problem of "the consumer doesn't know" seems like something that
> is reasonably within the protocol's responsibilities. In this case, an
> event happens on the broker, and the broker can tell the consumer.

I don't think a simple change protocol can solve these problems,
We can't promise that every consumer can receive the broker reset
cursor request.
When the consumer reconnects, the broker can't send the reset cursor request to
the client consumers, right? In this case, the consumer is still unaware, right?


[0] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
[1] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
[2] 
https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
>
> Thanks,
> Michael
>
> [0] 
> https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
>
> On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall  wrote:
> >
> > > if we add the new field in CommandSubscribe, we should ensure
> > > the synchronization between consumer reconnection and user
> > > calling receive and redeliverUnack method. it will affect the performance
> > > of receive. expose synchronization to hot paths it not a good idea.
> >
> > I don't think this is a valid objection. I am pretty sure we already
> > synchronize in the relevant places in the consumer to solve the exact
> > race condition you're concerned about: [0] [1].
> >
> > My proposed operation is to keep track of the latest message id that
> > the application has seen, and then tell the broker that id when
> > sending the Subscribe command. We already do similar logic here [2]
> > [3], but instead of getting the first message id the consumer hasn't
> > seen, we'll get the latest message id seen.
> >
> > Regarding performance, the PIP doesn't touch on how it will filter out
> > messages. What is the planned approach? In my understanding, the
> > client will keep track of the latest message id that the application
> > has seen and then will need to compare that message id against every
> > new mess. As such, it seems like telling the broker where to start
> > instead of naively checking a filter on every message would be
> > cheaper.
> >
> > > As described in Compatibility in PIP. Client consumer doesn't know
> > > Pulsar Admin reset cursor.
> >
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
> >
> > > * Consumers should close when the server resets the cursor,
> > > 

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Yunze Xu
I just missed the point that the reset cursor operations do not work
for the consumer. IIUC, the seek operation does not work as well. Then
I think the option is not user-friendly as the PIP says:

>  It needs to be enabled with a complete understanding of this configuration.

If users want, they can also record the latest position for each
consumer at the application side and filter the messages by the public
`MessageId#compareTo` API. If hiding these details in SDK still
requires users to know these details, I think it would not be better
than doing that explicitly in the application.

Thanks,
Yunze

On Wed, Mar 22, 2023 at 10:29 AM 丛搏  wrote:
>
> Hi, Michael:
>
> Michael Marshall  于2023年3月21日周二 23:17写道:
>
> >
> > One more point. Instead of keeping track of the latest message seen by
> > the application, the logic in my solution would actually just check
> > the last message in the `incomingMessages` queue (as in the most
> > recently added), and use that as the read position in the subscribe
> > command. If we made this change, we would have to change this code [0]
> > to not drop the `incomingMessages` queue.
>
> case 1:
> What we define the message that the application has seen?
> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?
> What I think we should lock the receive logic in [1]
> ```
> synchronized (this) {
> message = incomingMessages.take();
> messageProcessed(message);
> }
> ```
> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.
>
> case 2:
> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and
> check to consumer's current state
> ```
> synchronized (this) {
> if (consumer.isConnected) {
> if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> // After we have enqueued the messages on
> `incomingMessages` queue, we cannot touch the message
> // instance anymore, since for pooled messages, this
> instance was possibly already been released
> // and recycled.
> INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> getMemoryLimitController().ifPresent(limiter ->
> limiter.forceReserveMemory(messageSize));
> updateAutoScaleReceiverQueueHint();
> }
> }
> }
> ```
> case 3:
> when we subcommand sends to broker with `startMessageId = 1`, then the
> broker push message
> has not yet entered `incommingQueue`, the application invokes
> redeliver. in this way, we don't
> filter messages are correct, right?
>
> These are some cases that I simply thought of, and there must be
> others that I haven't thought
> of. Are you sure we can handle these problems correctly?
>
> > The problem of "the consumer doesn't know" seems like something that
> > is reasonably within the protocol's responsibilities. In this case, an
> > event happens on the broker, and the broker can tell the consumer.
>
> I don't think a simple change protocol can solve these problems,
> We can't promise that every consumer can receive the broker reset
> cursor request.
> When the consumer reconnects, the broker can't send the reset cursor request 
> to
> the client consumers, right? In this case, the consumer is still unaware, 
> right?
>
>
> [0] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> [1] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> [2] 
> https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> >
> > Thanks,
> > Michael
> >
> > [0] 
> > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> >
> > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall  
> > wrote:
> > >
> > > > if we add the new field in CommandSubscribe, we should ensure
> > > > the synchronization between consumer reconnection and user
> > > > calling receive and redeliverUnack method. it will affect the 
> > > > performance
> > > > of receive. expose synchronization to hot paths it not a good idea.
> > >
> > > I don't think this is a valid objection. I am pretty sure we already
> > > synchronize in the relevant places in the consumer to solve the exact
> > > race condition you're concerned about: [0] [1].
> > >
> > > My proposed operation is to keep track of the latest message id that
> > > the application has seen, and then tell the broker that id when
> > > sending the Subscribe command. We already do similar logic here [2]
> > > [3], but instead of getting the first message id the consumer hasn't
> > > seen, we'll get the latest message 

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Michael Marshall
Good questions. There were some gaps in my description above.

> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?

In this case, the consumer does not have the source of truth for the
readPosition. It would leave the new protocol field for `readPosition`
empty and the broker would use its source of truth for the read
position.

> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.

We don't need to synchronize this code here because the logic will
come after the consumer has been disconnected from broker a and before
it is connected to broker b.

> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and check to consumer's current state

We would not need to lock here because we do not enqueue new messages
after we've been disconnected from the broker and before we've sent
CommandSubscribe.

The logic is that when a client sends CommandSubcribe, it will tell
the broker the last message id it received in the last session. The
broker will set the readPosition to that message id +1.

> We can't promise that every consumer can receive the broker reset
> cursor request.

This is a valid concern. In the normal case where we do not drop the
connection, we trivially guarantee the client receives the protocol
command before receiving the next messages because TCP connections
guarantee order. Interestingly, the current design disconnects
consumers when we reset the cursor, but if we add a protocol message
to tell consumers what is happening, we can avoid this unnecessary
disconnection.

In the case where we drop a connection, there are two options.

1. We could implement some kind of reconciliation logic using a vector clock.
2. We could say that in the network failure scenario, the consumer
should not tell the broker its readPosition because it might have
missed an event on the broker. In that case, we would accept that some
duplicates will happen and we can "let it fail". The advantage to
letting it fail is that we have simpler code.

I think 2 would be the right initial implementation, and we could
implement 1 if we find 2 is a problem.

My biggest concern with the current proposal is that cursor resets
will not work correctly when this feature is used. If an administrator
resets a cursor to earliest and a client is using this setting, it
will filter out every message. That could be an expensive and
confusing mistake. I think we need to find a better solution than
simple filtering.

Ultimately, I think a protocol solution will yield better results,
especially since we'll want to implement this feature in the other
client languages.

Thanks,
Michael

On Tue, Mar 21, 2023 at 9:29 PM 丛搏  wrote:
>
> Hi, Michael:
>
> Michael Marshall  于2023年3月21日周二 23:17写道:
>
> >
> > One more point. Instead of keeping track of the latest message seen by
> > the application, the logic in my solution would actually just check
> > the last message in the `incomingMessages` queue (as in the most
> > recently added), and use that as the read position in the subscribe
> > command. If we made this change, we would have to change this code [0]
> > to not drop the `incomingMessages` queue.
>
> case 1:
> What we define the message that the application has seen?
> I think it is the[0], when the `incomingMessages` queue is empty,
> how do we get the correct `startPosition`?
> What I think we should lock the receive logic in [1]
> ```
> synchronized (this) {
> message = incomingMessages.take();
> messageProcessed(message);
> }
> ```
> why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> same logic? it's a bad code.
>
> case 2:
> If we sub with `startMessageId`, we also should lock any enqueue
> logic, like [2] and
> check to consumer's current state
> ```
> synchronized (this) {
> if (consumer.isConnected) {
> if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> // After we have enqueued the messages on
> `incomingMessages` queue, we cannot touch the message
> // instance anymore, since for pooled messages, this
> instance was possibly already been released
> // and recycled.
> INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> getMemoryLimitController().ifPresent(limiter ->
> limiter.forceReserveMemory(messageSize));
> updateAutoScaleReceiverQueueHint();
> }
> }
> }
> ```
> case 3:
> when we subcommand sends to broker with `startMessageId = 1`, then the
> broker push message
> has not yet entered `incommingQueue`, the application invokes
> redeliver. in this way, we don't
> filter messages are correct, right?
>
> These are some cases that I simply thought of, and there must be
> others that I haven't thought
> of. Are you sure we can handle these problems correctly?
>
> > The problem of "the consumer doesn't know" seems like something that
> > is reason

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread 丛搏
Hi, Yunze:

It is true that hiding these details in the SDK still requires users to have a
certain level of understanding of the configuration. However, this approach
can still be helpful for users who want to use the feature but want
to avoid dealing with the nitty-gritty details of implementation.

At least it will simplify the process of using cumulative ack with the
transaction.

Thanks,
Bo

Yunze Xu  于2023年3月22日周三 10:32写道:
>
> I just missed the point that the reset cursor operations do not work
> for the consumer. IIUC, the seek operation does not work as well. Then
> I think the option is not user-friendly as the PIP says:
>
> >  It needs to be enabled with a complete understanding of this configuration.
>
> If users want, they can also record the latest position for each
> consumer at the application side and filter the messages by the public
> `MessageId#compareTo` API. If hiding these details in SDK still
> requires users to know these details, I think it would not be better
> than doing that explicitly in the application.
>
> Thanks,
> Yunze
>
> On Wed, Mar 22, 2023 at 10:29 AM 丛搏  wrote:
> >
> > Hi, Michael:
> >
> > Michael Marshall  于2023年3月21日周二 23:17写道:
> >
> > >
> > > One more point. Instead of keeping track of the latest message seen by
> > > the application, the logic in my solution would actually just check
> > > the last message in the `incomingMessages` queue (as in the most
> > > recently added), and use that as the read position in the subscribe
> > > command. If we made this change, we would have to change this code [0]
> > > to not drop the `incomingMessages` queue.
> >
> > case 1:
> > What we define the message that the application has seen?
> > I think it is the[0], when the `incomingMessages` queue is empty,
> > how do we get the correct `startPosition`?
> > What I think we should lock the receive logic in [1]
> > ```
> > synchronized (this) {
> > message = incomingMessages.take();
> > messageProcessed(message);
> > }
> > ```
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
> >
> > case 2:
> > If we sub with `startMessageId`, we also should lock any enqueue
> > logic, like [2] and
> > check to consumer's current state
> > ```
> > synchronized (this) {
> > if (consumer.isConnected) {
> > if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > // After we have enqueued the messages on
> > `incomingMessages` queue, we cannot touch the message
> > // instance anymore, since for pooled messages, this
> > instance was possibly already been released
> > // and recycled.
> > INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > getMemoryLimitController().ifPresent(limiter ->
> > limiter.forceReserveMemory(messageSize));
> > updateAutoScaleReceiverQueueHint();
> > }
> > }
> > }
> > ```
> > case 3:
> > when we subcommand sends to broker with `startMessageId = 1`, then the
> > broker push message
> > has not yet entered `incommingQueue`, the application invokes
> > redeliver. in this way, we don't
> > filter messages are correct, right?
> >
> > These are some cases that I simply thought of, and there must be
> > others that I haven't thought
> > of. Are you sure we can handle these problems correctly?
> >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> >
> > I don't think a simple change protocol can solve these problems,
> > We can't promise that every consumer can receive the broker reset
> > cursor request.
> > When the consumer reconnects, the broker can't send the reset cursor 
> > request to
> > the client consumers, right? In this case, the consumer is still unaware, 
> > right?
> >
> >
> > [0] 
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L135
> > [1] 
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L440-L454
> > [2] 
> > https://github.com/apache/pulsar/blob/30d2469086fea989ac8baf059df8e69c66a68d89/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L875-L892
> > >
> > > Thanks,
> > > Michael
> > >
> > > [0] 
> > > https://github.com/apache/pulsar/blob/af1360fb167c1f9484fda5771df3ea9b21d1440b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L789-L795
> > >
> > > On Tue, Mar 21, 2023 at 9:46 AM Michael Marshall  
> > > wrote:
> > > >
> > > > > if we add the new field in CommandSubscribe, we should ensure
> > > > > the synchronization between consumer reconnection and user
> > > > > calling receive and redeliverUnack method. it will affect the 
> > 

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread 丛搏
Hi, Michael
> In this case, the consumer does not have the source of truth for the
> readPosition. It would leave the new protocol field for `readPosition`
> empty and the broker would use its source of truth for the read
> position.
application has received all the messages by application thread. we also need a
correct `startPosition`, right? but in your way, we will think about
the consumer
hasn't received any messages.

>
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
>
> We don't need to synchronize this code here because the logic will
> come after the consumer has been disconnected from broker a and before
> it is connected to broker b.
The application takes a message from the queue then reconnect,
the SubCommond can use the right startPostion? example:
1. application receives one message with `MessageId = 1`
2. consumer reconnect discovers the queue is empty, and the
lastDequeMessageId doesn't change.
3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
will redeliver from broker to client consumer, right?

As we can see in the example, the application also can receive
`MessageId = 1`, right?
> We would not need to lock here because we do not enqueue new messages
> after we've been disconnected from the broker and before we've sent
> CommandSubscribe.
we can see the code [0], the thread has changed.
Where do we guarantee that no new messages will come in?

>
> Ultimately, I think a protocol solution will yield better results,
> especially since we'll want to implement this feature in the other
> client languages.
The problem of the resetting cursor can be optimized in the future,
but can you ensure the
correctness of all the cases I mentioned above? IMO, if we use my
design, client change,
we don't need the broker to make any changes. its simple and it's easy
to implement.
I can make sure it's completely correct, I can make sure it's
completely correct. In your design,
I currently do not see a closed-loop implementation that can achieve
at least in the java client.

Thanks,
Bo
>
> Thanks,
> Michael
>
> On Tue, Mar 21, 2023 at 9:29 PM 丛搏  wrote:
> >
> > Hi, Michael:
> >
> > Michael Marshall  于2023年3月21日周二 23:17写道:
> >
> > >
> > > One more point. Instead of keeping track of the latest message seen by
> > > the application, the logic in my solution would actually just check
> > > the last message in the `incomingMessages` queue (as in the most
> > > recently added), and use that as the read position in the subscribe
> > > command. If we made this change, we would have to change this code [0]
> > > to not drop the `incomingMessages` queue.
> >
> > case 1:
> > What we define the message that the application has seen?
> > I think it is the[0], when the `incomingMessages` queue is empty,
> > how do we get the correct `startPosition`?
> > What I think we should lock the receive logic in [1]
> > ```
> > synchronized (this) {
> > message = incomingMessages.take();
> > messageProcessed(message);
> > }
> > ```
> > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > same logic? it's a bad code.
> >
> > case 2:
> > If we sub with `startMessageId`, we also should lock any enqueue
> > logic, like [2] and
> > check to consumer's current state
> > ```
> > synchronized (this) {
> > if (consumer.isConnected) {
> > if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
> > // After we have enqueued the messages on
> > `incomingMessages` queue, we cannot touch the message
> > // instance anymore, since for pooled messages, this
> > instance was possibly already been released
> > // and recycled.
> > INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > getMemoryLimitController().ifPresent(limiter ->
> > limiter.forceReserveMemory(messageSize));
> > updateAutoScaleReceiverQueueHint();
> > }
> > }
> > }
> > ```
> > case 3:
> > when we subcommand sends to broker with `startMessageId = 1`, then the
> > broker push message
> > has not yet entered `incommingQueue`, the application invokes
> > redeliver. in this way, we don't
> > filter messages are correct, right?
> >
> > These are some cases that I simply thought of, and there must be
> > others that I haven't thought
> > of. Are you sure we can handle these problems correctly?
> >
> > > The problem of "the consumer doesn't know" seems like something that
> > > is reasonably within the protocol's responsibilities. In this case, an
> > > event happens on the broker, and the broker can tell the consumer.
> >
> > I don't think a simple change protocol can solve these problems,
> > We can't promise that every consumer can receive the broker reset
> > cursor request.
> > When the consumer reconnects, the broker can't send the reset cursor 
> > request to
> > the client consumers, right? In this case, the consumer is still unaware, 
> > right?
> >

Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Michael Marshall
I am not following your objections to the protocol solution. It might
be more productive if I provided a draft PR with a sample
implementation. I'm not sure that I'll have time, but I'll try to put
something together this week.

> At least it will simplify the process of using cumulative ack with the
> transaction.

Is this the underlying motivation for the PIP?

>From my perspective, the PIP is seeking to decrease duplicate messages
experienced due to disconnections from the broker.

> The problem of the resetting cursor can be optimized in the future

Why should we push off solving this problem? It seems fundamental to
this PIP and should not be ignored. At the very least, I think we need
to have an idea of what the future solution would be before we defer
its implementation.

Thanks,
Michael


On Tue, Mar 21, 2023 at 10:52 PM 丛搏  wrote:
>
> Hi, Michael
> > In this case, the consumer does not have the source of truth for the
> > readPosition. It would leave the new protocol field for `readPosition`
> > empty and the broker would use its source of truth for the read
> > position.
> application has received all the messages by application thread. we also need 
> a
> correct `startPosition`, right? but in your way, we will think about
> the consumer
> hasn't received any messages.
>
> >
> > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > same logic? it's a bad code.
> >
> > We don't need to synchronize this code here because the logic will
> > come after the consumer has been disconnected from broker a and before
> > it is connected to broker b.
> The application takes a message from the queue then reconnect,
> the SubCommond can use the right startPostion? example:
> 1. application receives one message with `MessageId = 1`
> 2. consumer reconnect discovers the queue is empty, and the
> lastDequeMessageId doesn't change.
> 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> will redeliver from broker to client consumer, right?
>
> As we can see in the example, the application also can receive
> `MessageId = 1`, right?
> > We would not need to lock here because we do not enqueue new messages
> > after we've been disconnected from the broker and before we've sent
> > CommandSubscribe.
> we can see the code [0], the thread has changed.
> Where do we guarantee that no new messages will come in?
>
> >
> > Ultimately, I think a protocol solution will yield better results,
> > especially since we'll want to implement this feature in the other
> > client languages.
> The problem of the resetting cursor can be optimized in the future,
> but can you ensure the
> correctness of all the cases I mentioned above? IMO, if we use my
> design, client change,
> we don't need the broker to make any changes. its simple and it's easy
> to implement.
> I can make sure it's completely correct, I can make sure it's
> completely correct. In your design,
> I currently do not see a closed-loop implementation that can achieve
> at least in the java client.
>
> Thanks,
> Bo
> >
> > Thanks,
> > Michael
> >
> > On Tue, Mar 21, 2023 at 9:29 PM 丛搏  wrote:
> > >
> > > Hi, Michael:
> > >
> > > Michael Marshall  于2023年3月21日周二 23:17写道:
> > >
> > > >
> > > > One more point. Instead of keeping track of the latest message seen by
> > > > the application, the logic in my solution would actually just check
> > > > the last message in the `incomingMessages` queue (as in the most
> > > > recently added), and use that as the read position in the subscribe
> > > > command. If we made this change, we would have to change this code [0]
> > > > to not drop the `incomingMessages` queue.
> > >
> > > case 1:
> > > What we define the message that the application has seen?
> > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > how do we get the correct `startPosition`?
> > > What I think we should lock the receive logic in [1]
> > > ```
> > > synchronized (this) {
> > > message = incomingMessages.take();
> > > messageProcessed(message);
> > > }
> > > ```
> > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > same logic? it's a bad code.
> > >
> > > case 2:
> > > If we sub with `startMessageId`, we also should lock any enqueue
> > > logic, like [2] and
> > > check to consumer's current state
> > > ```
> > > synchronized (this) {
> > > if (consumer.isConnected) {
> > > if (canEnqueueMessage(message) && 
> > > incomingMessages.offer(message)) {
> > > // After we have enqueued the messages on
> > > `incomingMessages` queue, we cannot touch the message
> > > // instance anymore, since for pooled messages, this
> > > instance was possibly already been released
> > > // and recycled.
> > > INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
> > > getMemoryLimitController().ifPresent(limiter ->
> > > limiter.forceReserveMemory(messageSize));
> > > updateAutoSca

Re: [DISCUSS] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread guo jiwei
+1


Regards
Jiwei Guo (Tboy)

On Tue, Mar 21, 2023 at 4:21 PM Zike Yang  wrote:
>
> > To keep the goal clear, we can only set `httpClientRequestBufferSize`
> internally without giving the user a way to change it.
>
> +1.
>
> Thanks,
> Zike Yang
>
> On Tue, Mar 21, 2023 at 4:08 PM Yubiao Feng
>  wrote:
> >
> > Hi Zike
> >
> > > Thanks for your explanation. So from my understanding, there are
> > > actually two different goals in this PIP:
> > > * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> > > https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> > > * Exposing `httpClientRequestBufferSize` to help users improve the
> > > forwarding performance.
> >
> > To keep the goal clear, we can only set `httpClientRequestBufferSize`
> > internally without giving the user a way to change it.
> >
> >
> > Thanks
> > Yubiao Feng
> >
> >
> > On Tue, Mar 21, 2023 at 11:16 AM Zike Yang  wrote:
> >
> > > Hi, Yubiao
> > >
> > > Thanks for your explanation. So from my understanding, there are
> > > actually two different goals in this PIP:
> > > * Exposing `httpMaxRequestHeaderSize` to solve the long name issue:
> > > https://lists.apache.org/thread/q1m23ckyy10wvtzy65v8bwqwnh7r0gc8
> > > * Exposing `httpClientRequestBufferSize` to help users improve the
> > > forwarding performance.
> > >
> > > I'm +1 if my understanding is correct.
> > >
> > > Thanks,
> > > Zike Yang
> > >
> > > On Mon, Mar 20, 2023 at 10:07 PM Yubiao Feng
> > >  wrote:
> > > >
> > > > Hi Zike
> > > >
> > > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > > issue. We can set the buffer size in the proxy code based on the value
> > > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > > expose `httpClientRequestBufferSize`?
> > > >
> > > > 1. These two configurations work on two very different components: The
> > > > config "httpMaxRequestHeaderSize" is used for the internal server of
> > > > Pulsar-Proxy, and the config "httpClientRequestBufferSize" is used for
> > > the
> > > > internal HTTP client of the Pulsar-Proxy.
> > > >
> > > > 2. Users can modify the configure `httpClientRequestBufferSize` to 
> > > > adjust
> > > > the buffer of proxy forwarding requests to improve forwarding
> > > performance.
> > > >
> > > > So it's better to have two separate configurations
> > > >
> > > > Thanks
> > > > Yubiao Feng
> > > >
> > > >
> > > >
> > > > On Mon, Mar 20, 2023 at 6:36 PM Zike Yang  wrote:
> > > >
> > > > > > private int httpClientRequestBufferSize =
> > > httpMaxRequestHeaderSize;
> > > > >
> > > > > Is it worth exposing `httpClientRequestBufferSize` to the proxy user?
> > > > > Seems exposing `httpMaxRequestHeaderSize ` is enough to solve this
> > > > > issue. We can set the buffer size in the proxy code based on the value
> > > > > of `httpMaxRequestHeaderSize `. Is there any case that we need to
> > > > > expose `httpClientRequestBufferSize`?
> > > > >
> > > > > Thanks,
> > > > > Zike Yang
> > > > >
> > > > > On Fri, Mar 17, 2023 at 7:31 PM 丛搏  wrote:
> > > > > >
> > > > > > +1
> > > > > >
> > > > > > Hi, Yubiao :
> > > > > >
> > > > > > Thanks for your explanation. That makes sense to me.
> > > > > >
> > > > > > Thanks,
> > > > > > Bo
> > > > > >
> > > > > >
> > > > > > Yubiao Feng  于2023年3月17日周五
> > > 16:29写道:
> > > > > > >
> > > > > > > Hi Bo
> > > > > > >
> > > > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > > > proxy, can you explain in detail?
> > > > > > >
> > > > > > > Since The Pulsar-Proxy uses the tool `jetty-client` to forward 
> > > > > > > HTTP
> > > > > > > requests from users to The Pulsar-Broker, if the proxy receives a
> > > > > request
> > > > > > > like this:
> > > > > > >
> > > > > > > ```
> > > > > > > GET /admin/v2/public/default/tp.long..long/stats
> > > HTTP/1.1
> > > > > > > ```
> > > > > > >
> > > > > > > The internal client with forward this request like this:
> > > > > > >
> > > > > > > ```
> > > > > > > ByteBuf buf = allocate( config.httpClientRequestBufferSize )
> > > > > > > buf.write(requestLine);  // (Highlight) we will get
> > > > > > > a BufferOverflowException if the request line is too long.
> > > > > > > ```
> > > > > > >
> > > > > > > Therefore, in addition to ensuring that the proxy server can
> > > receive a
> > > > > long
> > > > > > > request line, the internal client must also process a long request
> > > > > line.
> > > > > > > And this problem can be solved by making configuration
> > > > > > > `httpClientRequestBufferSize` configurable.
> > > > > > >
> > > > > > >
> > > > > > > Thanks
> > > > > > > Yubiao Feng
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Mar 16, 2023 at 8:12 PM 丛搏  wrote:
> > > > > > >
> > > > > > > > hi yubiao :
> > > > > > > >
> > > > > > > > I have a question, why we need `httpClientRequestBufferSize ` in
> > > > > > > > proxy, can you explain in detail?
> 

Re: [VOTE] PIP-259: Make the config httpMaxRequestHeaderSize of the pulsar web server to configurable

2023-03-21 Thread PengHui Li
+1(binding)

Thanks,
Penghui

On Tue, Mar 21, 2023 at 3:21 PM Yunze Xu 
wrote:

> +1 (binding)
>
> Thanks,
> Yunze
>
> On Tue, Mar 21, 2023 at 2:59 PM 丛搏  wrote:
> >
> > +1 (binding)
> >
> > Thanks,
> > Bo
> >
> > Yubiao Feng  于2023年3月20日周一 17:11写道:
> > >
> > > Hi Pulsar Community
> > >
> > > This thread is to start the vote for PIP 259.
> > >
> > > Discussion:
> https://lists.apache.org/thread/f11cld5cbc8sodhgvs5s28lw8nxsr9dc
> > > Issue: https://github.com/apache/pulsar/issues/19826
> > > Implementation: https://github.com/apache/pulsar/pull/19514
> > >
> > >
> > > Voting will stay open for at least 48h.
> > >
> > > Thanks,
> > > Yubiao Feng
>


Re: [DISCUSS] PIP-260: Client consumer filter received messages

2023-03-21 Thread Michael Marshall
Because we already send the `startMessageId`, there is a chance where
we might not even need to update the protocol for the
CommandSubscribe. In light of that, I quickly put together a PR
showing how that field might be used to inform the broker where to
start the read position for the cursor.

https://github.com/apache/pulsar/pull/19892

The PR is not complete, but it does convey the general idea. I wrote
additional details in the draft's description.

Thanks,
Michael

On Tue, Mar 21, 2023 at 11:31 PM Michael Marshall  wrote:
>
> I am not following your objections to the protocol solution. It might
> be more productive if I provided a draft PR with a sample
> implementation. I'm not sure that I'll have time, but I'll try to put
> something together this week.
>
> > At least it will simplify the process of using cumulative ack with the
> > transaction.
>
> Is this the underlying motivation for the PIP?
>
> From my perspective, the PIP is seeking to decrease duplicate messages
> experienced due to disconnections from the broker.
>
> > The problem of the resetting cursor can be optimized in the future
>
> Why should we push off solving this problem? It seems fundamental to
> this PIP and should not be ignored. At the very least, I think we need
> to have an idea of what the future solution would be before we defer
> its implementation.
>
> Thanks,
> Michael
>
>
> On Tue, Mar 21, 2023 at 10:52 PM 丛搏  wrote:
> >
> > Hi, Michael
> > > In this case, the consumer does not have the source of truth for the
> > > readPosition. It would leave the new protocol field for `readPosition`
> > > empty and the broker would use its source of truth for the read
> > > position.
> > application has received all the messages by application thread. we also 
> > need a
> > correct `startPosition`, right? but in your way, we will think about
> > the consumer
> > hasn't received any messages.
> >
> > >
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > >
> > > We don't need to synchronize this code here because the logic will
> > > come after the consumer has been disconnected from broker a and before
> > > it is connected to broker b.
> > The application takes a message from the queue then reconnect,
> > the SubCommond can use the right startPostion? example:
> > 1. application receives one message with `MessageId = 1`
> > 2. consumer reconnect discovers the queue is empty, and the
> > lastDequeMessageId doesn't change.
> > 3. consumer sends a subcommand with MessageId.earliest, the `MessageId = 1`
> > will redeliver from broker to client consumer, right?
> >
> > As we can see in the example, the application also can receive
> > `MessageId = 1`, right?
> > > We would not need to lock here because we do not enqueue new messages
> > > after we've been disconnected from the broker and before we've sent
> > > CommandSubscribe.
> > we can see the code [0], the thread has changed.
> > Where do we guarantee that no new messages will come in?
> >
> > >
> > > Ultimately, I think a protocol solution will yield better results,
> > > especially since we'll want to implement this feature in the other
> > > client languages.
> > The problem of the resetting cursor can be optimized in the future,
> > but can you ensure the
> > correctness of all the cases I mentioned above? IMO, if we use my
> > design, client change,
> > we don't need the broker to make any changes. its simple and it's easy
> > to implement.
> > I can make sure it's completely correct, I can make sure it's
> > completely correct. In your design,
> > I currently do not see a closed-loop implementation that can achieve
> > at least in the java client.
> >
> > Thanks,
> > Bo
> > >
> > > Thanks,
> > > Michael
> > >
> > > On Tue, Mar 21, 2023 at 9:29 PM 丛搏  wrote:
> > > >
> > > > Hi, Michael:
> > > >
> > > > Michael Marshall  于2023年3月21日周二 23:17写道:
> > > >
> > > > >
> > > > > One more point. Instead of keeping track of the latest message seen by
> > > > > the application, the logic in my solution would actually just check
> > > > > the last message in the `incomingMessages` queue (as in the most
> > > > > recently added), and use that as the read position in the subscribe
> > > > > command. If we made this change, we would have to change this code [0]
> > > > > to not drop the `incomingMessages` queue.
> > > >
> > > > case 1:
> > > > What we define the message that the application has seen?
> > > > I think it is the[0], when the `incomingMessages` queue is empty,
> > > > how do we get the correct `startPosition`?
> > > > What I think we should lock the receive logic in [1]
> > > > ```
> > > > synchronized (this) {
> > > > message = incomingMessages.take();
> > > > messageProcessed(message);
> > > > }
> > > > ```
> > > > why do we need to invoke `BlockingQueue.take` and `synchronized` in the
> > > > same logic? it's a bad code.
> > > >
> > > > case 2:
> > > > If we sub with `startMessageId`, we also should l