[ANNOUNCE] New Committer: Marvin Cai

2021-12-13 Thread linlin
The Apache Pulsar Project Management Committee (PMC) has invited Marvin Cai
https://github.com/MarvinCai to become a committer and we are pleased to
announce that he has accepted.

Marvin has joined the community for more than 1 year now and he is active in
the Pulsar community for more than 6 months.

Welcome and Congratulations, Marvin!

Please join us in congratulating and welcoming Marvin onboard!

Best Regards,
Lin Lin on behalf of the Pulsar PMC


[VOTE] Apache Pulsar 2.8.2 candidate 1

2021-12-13 Thread linlin
This is the first release candidate for Apache Pulsar, version 2.8.2.

It fixes the following issues:
https://github.com/apache/pulsar/issues?q=label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed+label%3Arelease%2F2.8.2

*** Please download, test and vote on this release. This vote will stay open
for at least 72 hours ***

Note that we are voting upon the source (tag), binaries are provided for
convenience.

Source and binary files:
https://dist.apache.org/repos/dist/dev/pulsar/pulsar-2.8.2-candidate-1/

SHA-512 checksums:
f51e93d5caa7ea4ec2616e096ca75dd71bccb475632ee5ff35d713b8f5112689d17315a1cd9350dd8f8f0bdc2e059be5fb179b2b8b3b39aae77e466103294683
 apache-pulsar-2.8.2-bin.tar.gz
8540641e76fb541f9dbfaff263946ed19a585266e5de011e78188d78ec4e1c828e8893eb2e783a1ebad866f5513efffd93396b7abd77c347f34ab689badf4fad
 apache-pulsar-2.8.2-src.tar.gz


Maven staging repo:
https://repository.apache.org/content/repositories/orgapachepulsar-1108/

The tag to be voted upon:
v2.8.2-candidate-1
https://github.com/apache/pulsar/releases/tag/v2.8.2-candidate-1

Pulsar's KEYS file containing PGP keys we use to sign the release:
https://dist.apache.org/repos/dist/dev/pulsar/KEYS

Please download the source package, and follow the README to build
and run the Pulsar standalone service.

Lin Lin


[VOTE] Apache Pulsar 2.8.2 candidate 2

2021-12-20 Thread linlin
This is the second release candidate for Apache Pulsar, version 2.8.2

It fixes the following issues:
https://github.com/apache/pulsar/issues?q=label%3Acherry-picked%2Fbranch-2.8+label%3Arelease%2F2.8.2+is%3Aclosed

*** Please download, test and vote on this release. This vote will stay open
for at least 72 hours ***

Note that we are voting upon the source (tag), binaries are provided for
convenience.
Source and binary files:
https://dist.apache.org/repos/dist/dev/pulsar/pulsar-2.8.2-candidate-2/

SHA-512 checksums:
59aa0a14188a766ce802ba30cbaa2584a1904d26d8f41f164d3f97ea1970aa1391e11755d8077c96aeb136d2b9d73bf8b768978de7fa7f71d69cb57e2c1fce8c
 apache-pulsar-2.8.2-bin.tar.gz

82a1423fda4004297ca2867279077ed261c7149be96deca2c127ba5b91af08fec80dc4a8b15ee2ba8704e209efa577a0c7b4cfb78341d3a43a38bf476c894c5c
 apache-pulsar-2.8.2-src.tar.gz

Maven staging repo:
https://repository.apache.org/content/repositories/orgapachepulsar-1129/

The tag to be voted upon:
v2.8.2-candidate-2 (4b9cadcd57e41bc8eb95cc9b9917f938365b1cca)
https://github.com/apache/pulsar/releases/tag/v2.8.2-candidate-2

Pulsar's KEYS file containing PGP keys we use to sign the release:
https://dist.apache.org/repos/dist/dev/pulsar/KEYS

Release notes draft:
https://github.com/apache/pulsar/pull/13400

Please download the source package, and follow the README to build
and run the Pulsar standalone service.


[ANNOUNCE] Apache Pulsar 2.8.2 released

2022-01-05 Thread linlin
The Apache Pulsar team is proud to announce Apache Pulsar version 2.8.2.

Pulsar is a highly scalable, low latency messaging platform running on
commodity hardware. It provides simple pub-sub semantics over topics,
guaranteed at-least-once delivery of messages, automatic cursor management
for
subscribers, and cross-datacenter replication.

For Pulsar release details and downloads, visit:

https://pulsar.apache.org/download

Release Notes are at:
http://pulsar.apache.org/release-notes

We would like to thank the contributors that made the release possible.

Regards,

The Pulsar Team


[ANNOUNCE] New Committer: Aloys Zhang

2022-02-09 Thread linlin
The Apache Pulsar Project Management Committee (PMC) has invited Aloys Zhang

(https://github.com/aloyszhang) to become a committer and we are pleased to

announce that he has accepted.

Aloys Zhang joined the Pulsar community in June 2020 and contributed a lot
of commits
to the community, including PIP-70 which brings lightweight broker entry
metadata
to Pulsar and some other pull requests:
https://github.com/apache/pulsar/pulls?q=is%3Apr+assignee%3Aaloyszhang+.

Welcome and Congratulations, Aloys Zhang! Please enjoy the journey as a
committer :)

Please join us in congratulating and welcoming Aloys Zhang onboard!

Best Regards,
Lin Lin on behalf of the Pulsar PMC


[PIP-78] Split the individual acknowledgments into multiple entries

2021-01-17 Thread linlin
Hi, community:
Recently we encountered some problems when using individual
acknowledgments, such as:
when the amount of acknowledgment is large, entry writing fails; a large
amount of cache causes OOM, etc.
So I drafted a PIP in `
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing`,
any voice is welcomed.


Re: [PIP-78] Split the individual acknowledgments into multiple entries

2021-01-20 Thread linlin
Thank you very much, I will add these points to the document
1
Now the persistence of ack is also asynchronous, not every ack will be
persisted immediately, and the lost part will be re-consumed.
When the entry is successfully written but the marker fails to be written,
we will consider the whole to be a failure and will continue to look
forward until we find a usable marker.
If the data found before is not upgraded, because our new type of entry
will have a mark, we will restore it according to the existing old logic.

2
We only copy the entry in the marker, other entries will not be copied.
It's useless even if copied


On 2021/01/18 06:16:03, Sijie Guo  wrote:
> Hi Lin,>
>
> Thanks you and Penghui for drafting this! We have seen a lot of pain
points>
> of `managedLedgerMaxUnackedRangesToPersist` when enabling delayed
messages.>
> Glad that you and Penghui are spending time on resolving this!>
>
> Overall the proposal looks good. But I have a couple of questions about
the>
> proposal.>
>
> 1. What happens if the broker fails to write the entry marker? For
example,>
> at t0, the broker flushes dirty pages and successfully writes an entry>
> marker. At t1, the broker tries to flushes dirty pages but failed to
write>
> the new entry marker. How can you recover the entry marker?>
>
> 2.  When a broker crashes and recovers the managed ledger, the cursor>
> ledger is not writable anymore. Are you going to create a new cursor
ledger>
> and copy all the entries from the old cursor ledger to the new one?>
>
> It would be good if you can clarify these two questions.>
>
> - Sijie>
>
> On Sun, Jan 17, 2021 at 9:48 PM linlin  wrote:>
>
> > Hi, community:>
> > Recently we encountered some problems when using individual>
> > acknowledgments, such as:>
> > when the amount of acknowledgment is large, entry writing fails; a
large>
> > amount of cache causes OOM, etc.>
> > So I drafted a PIP in `>
> >>
> >
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing`>

> > <
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing>>

> > ,>
> > any voice is welcomed.>
> >>
>


Re: [PIP-78] Split the individual acknowledgments into multiple entries

2021-01-20 Thread linlin
We can look at ManagedCursorImpl.buildIndividualDeletedMessageRanges

What is saved in the entry is not a bitSet, but a messageRange one by one,
which contains information such as ledgerId and entryId. BitSet only exists
in the memory and is used to quickly determine whether it already exists.
In addition, the position of each ack will be stored in the
individualDeletedMessages queue. When persisted to the entry, the queue
will be traversed, and the position information of each ack will generate a
messageRange.
A messageRange contains lowerEndpoint (ledgerId+entryId), upperEndpoint
(ledgerId+entryId), 4 longs, about 256 bits.

We assume a more extreme scenario, 300K messages, every other ack has an
unacknowledged, that is, 150K location information will be stored in
individualDeletedMessages. 150K * 256/8/1024 /1024 ≈ 4.6MB
Of course, there are also scenarios where the customer's ack spans several
ledgers.


On 2021/01/20 00:38:47, Joe F  wrote:
> I have a simpler question. Just storing the message-ids raw will fit
~300K>
> entries in one ledger entry. With the bitmap  changes, we can store a>
> couple of million  within one 5MB ledger entry.  So can you tell us what>
> numbers of unacked messages are  creating a problem?  What exactly are
the>
> issues you face, and at what numbers of unacked messages/memory use etc?>
>
> I have my own concerns about this proposal, but I would like to
understand>
> the problem first>
>
> Joe>
>
> On Sun, Jan 17, 2021 at 10:16 PM Sijie Guo  wrote:>
>
> > Hi Lin,>
> >>
> > Thanks you and Penghui for drafting this! We have seen a lot of pain
points>
> > of `managedLedgerMaxUnackedRangesToPersist` when enabling delayed
messages.>
> > Glad that you and Penghui are spending time on resolving this!>
> >>
> > Overall the proposal looks good. But I have a couple of questions about
the>
> > proposal.>
> >>
> > 1. What happens if the broker fails to write the entry marker? For
example,>
> > at t0, the broker flushes dirty pages and successfully writes an entry>
> > marker. At t1, the broker tries to flushes dirty pages but failed to
write>
> > the new entry marker. How can you recover the entry marker?>
> >>
> > 2.  When a broker crashes and recovers the managed ledger, the cursor>
> > ledger is not writable anymore. Are you going to create a new cursor
ledger>
> > and copy all the entries from the old cursor ledger to the new one?>
> >>
> > It would be good if you can clarify these two questions.>
> >>
> > - Sijie>
> >>
> > On Sun, Jan 17, 2021 at 9:48 PM linlin  wrote:>
> >>
> > > Hi, community:>
> > > Recently we encountered some problems when using individual>
> > > acknowledgments, such as:>
> > > when the amount of acknowledgment is large, entry writing fails; a
large>
> > > amount of cache causes OOM, etc.>
> > > So I drafted a PIP in `>
> > >>
> > >>
> >
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing`>

> > <
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing>>

> > > <>
> >
https://docs.google.com/document/d/1uQtyb8t6X04v2vrSrdGWLFkuCkBcGYZbqK8XsVJ4qkU/edit?usp=sharing>

> > >>
> > > ,>
> > > any voice is welcomed.>
> > >>
> >>
>


[PIP] PIP for the Topic policy across multiple clusters

2021-07-27 Thread linlin
Hi, All

I have prepared a PIP for the Topic policy across multiple clusters.
Please take a look and let me know what you think.

I would really appreciate it.

Best Regards,
Lin Lin

===
Topic policy across multiple clusters


   -

   Status: Proposal
   -

   Authors: Penghui Li、Chen Hang、 Lin Lin
   -

   Pull Request:
   -

   Mailing List discussion:
   -

   Release:

Motivation

When setting the topic policy for a geo-replicated cluster, some policies
want to affect the whole geo-replicated cluster but some only want to
affect the local cluster. So the proposal is to support global topic policy
and local topic policy.
Approach

Currently, we are using the TopicPolicies construction to store the topic
policy for a topic. An easy way to achieve different topic policies for
multiple clusters is to add a global flag to TopicPolicies .Replicator will
replicate policies with a global flag to other clusters

```

public class TopicPolicies {

 bBoolean isGlobal = false

}

```

We only cache one local Topic Policies in the memory before. Now it will
become two, one is Local and the other is Global.

After adding the global topic policy, the topic applied priority is:


   1.

   Local cluster topic policy
   2.

   Global topic policy
   3.

   Namespace policy
   4.

   Broker default configuration


When setting a global topic policy, we can use the `--global` option, it
should be:

```

bin/pulsar-admin topics set-retention -s 1G -t 1d --global my-topic

```

If the --global option is not added, the behavior is consistent with
before, and only updates the local policies.

Topic policies are stored in System Topic, we can directly use Replicator
to replicate data to other Clusters. We need to add a new API to the
Replicator interface, which can set the Filter Function. Then add a
Function to filter out the data with isGlobal = false

Delete a cluster?

Deleting a cluster will now delete local topic policies and will not affect
other clusters, because only global policies will be replicated to other
clusters
Changes

   -

   Every topic policy API adds the `--global` option. Including broker REST
   API, Admin SDK, CMD.



   -

   Add API `public void setFilterFunction(Function)`. If
   Function returns false, then filter out the input message.


Compatibility

The solution does not introduce any compatibility issues, `isGlobal` in
TopicPolicies is false by default in existing Policies.

Test Plan

   1.

   Existing TopicPolicies will not be affected
   2.

   Only replicate Global Topic Policies, FilterFunction can run as expected
   3.

   Priority of Topic Policies matches our setting


PIP: Topic policy across multiple clusters

2021-07-28 Thread linlin
Hi, All

I have prepared a PIP for the Topic policy across multiple clusters.
Please take a look and let me know what you think.

I would really appreciate it.

Best Regards,
Lin Lin



===

clusters


   -

   Status:
   -

   Authors: Penghui Li、Chen Hang、 Lin Lin
   -

   Pull Request:
   -

   Mailing List discussion:
   -

   Release:

Motivation

When setting the topic policy for a geo-replicated cluster, some policies
want to affect the whole geo-replicated cluster but some only want to
affect the local cluster. So the proposal is to support global topic policy
and local topic policy.
Approach

Currently, we are using the TopicPolicies construction to store the topic
policy for a topic. An easy way to achieve different topic policies for
multiple clusters is to add a global flag to TopicPolicies .Replicator will
replicate policies with a global flag to other clusters

```

public class TopicPolicies {

 boolean isGlobal = false

}

```

We only cache one local Topic Policies in the memory before. Now it will
become two, one is Local and the other is Global.

After adding the global topic policy, the topic applied priority is:


   1.

   Local cluster topic policy
   2.

   Global topic policy
   3.

   Namespace policy
   4.

   Broker default configuration


When setting a global topic policy, we can use the `--global` option, it
should be:

```

bin/pulsar-admin topics set-retention -s 1G -t 1d --global my-topic

```

If the --global option is not added, the behavior is consistent with
before, and only updates the local policies.

Topic policies are stored in System Topic, we can directly use Replicator
to replicate data to other Clusters. We need to add a new API to the
Replicator interface, which can set the Filter Function. Then add a
Function to filter out the data with isGlobal = false

Delete a cluster?

Deleting a cluster will now delete local topic policies and will not affect
other clusters, because only global policies will be replicated to other
clusters
Changes

   -

   Every topic policy API adds the `--global` option. Including broker REST
   API, Admin SDK, CMD.



   -

   Add API `public void setFilterFunction(Function)`. If
   Function returns false, then filter out the input message.


Compatibility

The solution does not introduce any compatibility issues, `isGlobal` in
TopicPolicies is false by default in existing Policies.

Test Plan

   1.

   Existing TopicPolicies will not be affected
   2.

   Only replicate Global Topic Policies, FilterFunction can run as expected
   3.

   Priority of Topic Policies matches our setting


[Discuss] Optimize the performance of creating Topic

2021-07-29 Thread linlin
Creating a topic will first check whether the topic already exists.
The verification will read all topics under the namespace, and then
traverse these topics to see if the topic already exists.
When there are a large number of topics under the namespace(about 300,000
topics),
less than 10 topics can be created in one second.

Without a distributed lock, this check is unreliable and costly.
I tried to delete this check and write to ZooKeeper directly. If the znode
already exists, it means the topic already exists.

Then, I found this scenario in the unit test:
The user already has a non-partitioned topic like `topic-name-partition-123`
Then, he wants to create a partitioned topic like `topic-name`.
It cannot be created successfully now.
When traversing all topics, prefix matching is also performed.

In order to solve this problem, I want to add a check for reserved words on
the topic creation interface,and the topic name is not allowed to contain
-partition-, but this may cause some compatibility problems.

I want to hear your opinions. Is there a better way?


[Proposal] Make Dispatcher pluggable

2021-09-07 Thread linlin
# Motivation

There are many users who need to use tag messages. The implementation of
this part has also been discussed before:

https://lists.apache.org/list.html?*@pulsar.apache.org:lte=2y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers

I suggest to plug-in the Dispatcher, so that users can not only develop tag
messages, but also customize their own delayed messages and other features.

Plug-in Dispatcher has no compatibility impact on existing Brokers.

# Modifications

   1.

   Add a configuration item `dispatcherProviderClassName`,the creation of
   an existing Dispatcher will be moved to DefaultDispatcherProvider, as the
   default class.
   2.

   Add a new Interface DispatcherProvider


public interface DispatcherProvider {



// Use `DispatcherProvider` to create `Dispatcher`

Dispatcher createDispatcher(Consumer consumer, Subscription
subscription);

static DispatcherProvider createDispatcherProvider(ServiceConfiguration
serviceConfiguration) {

   // According to `dispatcherProviderClassName`, create Provider through
reflection

}

}

We can get BrokerService from Subscription, and then Pulsar from
BrokerService, so the parameters of this interface must be sufficient.

DispatcherProvider will be created when the subscription is initialized:

```

DispatcherProvider.createDispatcherProvider(config);

```

When there is a Consumer subscription, we will create a Dispatcher:

dispatcher = dispatcherProvider.createDispatcher(consumer, this);

I opened a new PR:

https://github.com/apache/pulsar/pull/11948


[VOTE] PIP - Support pluggable entry filter in Dispatcher

2021-10-12 Thread linlin
Hi Pulsar Community,

I would like to start a VOTE for PIP - Support pluggable entry filter in
Dispatcher.

The issue for this PIP is here:
https://github.com/apache/pulsar/issues/12269

Please VOTE within 72 hours.

Thanks,
Lin Lin


[DISCUSS] Apache Pulsar 2.8.2 Release

2021-10-27 Thread linlin
Hi all,

I hope you've all been doing well. It has been more than one month since we
released the Apache Pulsar 2.8.1. We have a lot of fixes already merged. So
I would like to start to prepare our next patch release in the next few
days.

You can find the whole change list of the 2.8.2 release from:
https://github.com/apache/pulsar/pulls?q=is%3Aopen+is%3Apr+label%3Arelease%2F2.8.2

If I missed some features, please let me know. And if some PRs can't be
completed in a few days, the owner can help check if we should include them
in 2.8.2 or push them to the next release version.

Thanks,
Lin Lin


[Discuss] Let the client perceive the rate limit

2021-11-01 Thread linlin
Hi all:
Now, Broker limits the rate of producing messages by setting
`channel.setAutoRead(false)`. But no exception is returned to the client,
or any log is printed, which makes it very difficult for us to troubleshoot
the problem.

The client timeout when sending messages and the connection will be closed
(the client's heartbeat not be processed by Broker). This has brought a lot
of obstacles to our troubleshooting. We don't know if there is a problem
with the Broker or the rate is limited.

I suggest that before setting AutoRead=false, at least one exception should
be returned to the client, so that the user can perceive that it has been
throttled, or at least print the log on the Broker side. Returning an
exception will change the existing behavior

I look forward to your better solution


Re: [VOTE] PIP 106: Negative acknowledgment backoff

2021-11-01 Thread linlin
+1

Enrico Olivelli  于2021年11月1日周一 下午5:47写道:

> +1 (binding)
>
>
> Thanks
> Enrico
>
> Il Lun 1 Nov 2021, 09:50 PengHui Li  ha scritto:
>
> > I would like to start a VOTE for PIP 106:  Negative acknowledgment
> backoff
> >
> > The issue for this PIP is here
> >
> > https://github.com/apache/pulsar/issues/12379
> >
> > Please VOTE within 72 hours.
> >
> > - Penghui
> > 
> >
> > ## Motivation
> >
> > Apache Pulsar supports the at-least-once message delivery semantic which
> > can tolerate the consumer failure such as the consumer write the data to
> > the database but the database might offline for a while, call an external
> > HTTP server but the HTTP server is not available or maybe the parts of
> the
> > consumer fault(can’t connect to the database or HTTP server).
> >
> > In general, the consumer is not able to process the message successfully,
> > what we can do for the above case is we can redeliver the message after
> > processing the message failure so that the message can be redelivered to
> > other consumers(for the Shared subscription). This is a frequently used
> > example:
> >
> > ```java
> > Message msg = consumer.receive();
> >
> > try {
> >   process(msg);
> > consumer.acknowledge(msg);
> > } catch (Exception e) {
> >   consumer.negativeAcknowledge(msg);
> > }
> > ```
> >
> > But you might don’t want to redeliver the message immediately, give
> > non-working services(HTTP server, Database) some time to recover to avoid
> > the extra overhead caused by too frequent retries. Currently, we can
> > specify a delay for the message redelivery by the negative
> acknowledgment.
> >
> > ```java
> > client.newConsumer()
> > 
> > .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
> > .subscribe();
> > ```
> >
> > But this is not flexible enough, so the proposal is to introduce a
> > redelivery backoff mechanism which we can achieve redelivery with
> different
> > delays according to the number of times the message is retried such as
> 1s,
> > 2s, 4s, 8s, 16s in the next 5 times message redelivery.
> >
> > -> reconsumeLater
> >
> > ## Approach
> >
> > The approach is to introduce a `NegativeAckRedeliveryBackoff` at the
> > client-side, users can specify a `NegativeAckRedeliveryBackoff` for a
> > consumer. And the client will provide an implementation
> > `NegativeAckRedeliveryExponentialBackoff`.
> >
> > The NegativeAckBackoff cannot be used with redelivery delay together, and
> > the default redelivery delay will not change.
> >
> > Users are also able to implement a specific
> `NegativeAckRedeliveryBackoff`,
> > For some frequently used backoff implementations, we should also support
> it
> > in pulsar clients to provide users with an out-of-the-box experience.
> >
> > Notice: the consumer crashes will trigger the redelivery of the unacked
> > message, this case will not respect the `NegativeAckRedeliveryBackoff`,
> > which means the message might get redelivered earlier than the delay time
> > from the backoff.
> >
> > ## API changes
> >
> > The new `NegativeAckBackoff` interface
> > ```java
> > interface NegativeAckBackoff {
> >
> > long next(int redeliveryCount);
> >
> > }
> > ```
> >
> > A new method for building the consumer
> > ```java
> > client.newConsumer()
> > 
> > .negativeAckRedeliveryBackoff(...)
> > .subscribe();
> > ```
> >
> > Notice: the `NegativeAckRedeliveryBackoff` will not work with
> > `consumer.negativeAcknowledge(MessageId messageId)` because we are not
> able
> > to get the redelivery count from the message ID.
> >
> > The consumer configuration also can be load from a configuration file, so
> > we should also support specify the `NegativeAckRedeliveryBackoff` when
> load
> > consumer configuration from config file. New method will be added in the
> > `ConsumerBuilder()`
> >
> > ```java
> > ConsumerBuilder negativeAckRedeliveryBackoff(String className, String
> > params);
> > ```
> >
> > ## Compatibility
> >
> > The proposal will not introduce any compatibility issues.
> >
> > ## Tests Plan
> >
> > Unit tests & integration tests
> >
>


[DISCUSS] Always set a broker side timestamp for message and deprecate some API

2024-01-06 Thread linlin
Now, if the message's metadata does not set a broker side timestamp, the
ledger expiration check is based on the client's publish time.

When the client machine's clock is incorrect (eg: set to 1 year later) ,
the ledger can not be cleaned up. Issue
https://github.com/apache/pulsar/issues/21347

`AppendBrokerTimestampMetadataInterceptor` can set timestamp for messages
on the broker side, but we can not ensure that the
`AppendBrokerTimestampMetadataInterceptor` is always enable

Therefore, I open this PR(https://github.com/apache/pulsar/pull/21835) to
always set the broker timestamp for messages on the broker side.

With this change , firstly we should deprecate
AppendBrokerTimestampMetadataInterceptor.
It no longer needs to exist

Secondly, we should deprecate `hasBrokerPublishTime` in interface Message.
It always returns true.
This API is created in PR (https://github.com/apache/pulsar/pull/11553)
This PR is for the client to obtain BrokerPublishTime, so the
`hasBrokerPublishTime` API is not necessary.


[DISCUSS] PIP-219: Support full scan and trim ledger

2022-10-26 Thread linlin
Hi all,

I have drafted PIP-219: Support full scan and trim ledger

PIP link:
https://github.com/apache/pulsar/issues/18128

Here's a copy of the contents of the GH issue for your references:

### Motivation

Broker uses the `Trimledgers` thread to clean up outdated ledgers. During
cleaning, each Broker traverses the topic metadata in memory to find the
ledger that reach the retention or TTL threshold.
However, there are some problems with this approach. When a topic has no
producer and consumer, Broker deletes the metadata of topic from memory. As
a result, ledgers of these topics can never be deleted.
Therefore, we need a way to scan and clean all outdated ledgers .

### Goal

The full scan will cause a large number of requests to the ZooKeeper.
 Therefore, the existing cleanup mode will be retained and a full scan mode
will be added.


### API Changes

1. Add a new scheduling thread pool

2. Add the following configuration item:
// Full scan interval. This parameter is enabled only when the value > 0.
fullScanTrimLedgerInterval=0
// Maximum number of Metadata requests per second during scanning
fullScanMaximumMetadataConcurrencyPerSecond=200

### Implementation

1. Only the Leader Broker performs full scan.
2. Leader Broker traverse `managedLedger` in each namespace from meta store
. Since Ledger metadata contains the creation time. If the creation time is
greater than the retention time + TTL time, Ledger should be deleted.
Only the metadata of Ledger is parsed instead of loading all topics to the
memory.
The metadata request frequency is limited using semaphore.

3. When a topic that meets the conditions, the leader broker loads the
topic and invokes its `TrimLedger` method. After cleaning is done, the
leader closes the topic to release memory.


[VOTE] PIP-219: Support full scan and trim ledger

2022-11-01 Thread linlin
Hi folks,

I'd like to start a vote for the PIP-219: Support full scan and trim ledger
(
https://github.com/apache/pulsar/issues/18128)

The discussion can be found here:
https://lists.apache.org/thread/wy8sqs2fdmw3kcdfos7t1ztpccfdmv72

Best regards.

linlin


[DISCUSS] PIP-255: Assign topic partitions to bundle by round robin

2023-03-14 Thread linlin
Hi all,
I created a proposal to
assign topic partitions to bundles by round robin:
https://github.com/apache/pulsar/issues/19806

It is already running in our production environment,
and it has a good performance.

Thanks!


[VOTE] PIP-255: Make the partition assignment strategy pluggable

2023-04-20 Thread linlin
Hi Pulsar Community,

This thread is to start the vote for PIP 255.
After discussion, we believe that it is a better way to plug-in the
partition assignment strategy,
and let customers complete the specific implementation.

So the title is change from "Assign topic partitions to bundle by round
robin" to
"Make the partition assignment strategy pluggable"

Discussion: https://lists.apache.org/thread/pxx715jrqjc219pymskz8xcz57jdmcfy
Issue: https://github.com/apache/pulsar/issues/19806

Thanks,
Lin Lin