Re: [DISCUSS] Alternatives to changing public protocol

2022-07-24 Thread Haiting Jiang
Hi Asaf,

> (the field is not generic,
> it's specifically named shadow_message_id).

As Penghui suggested, this field name is changed to `message_id` for 
potential generic usage. :)


> The second problem is clients: Every such field will eventually trickle
> down to the clients, which will need to ignore that field. In my opinion,
> it makes it harder for the client's maintainers. Especially when the
> community goal is to expand and have many languages clients maintained by
> the community

Our current client's implementation is quite complex already. Comparing with 
this,
ignoring a few fields does not seems to be a significant hard thing in this,
as long as we document it well, right?


> I believe someone who tries to reason about Pulsar, and its architecture,
> by looking at its public API should not have any fields which will never be
> relevant to the reader.  It makes it hard to reason and understand the
> public API.
> 

This design principle of keeping the public API clean is clear and easy to
understand and I totally support this. But in the case of PIP-180 or
geo-replication, the replicator can be considered as a special producer
client, and it just inherited the basic semantic of a normal producer and
extended its abilities to support some special internal usage.

Of course we can use a different protocol and different port for strictly
inter-broker communications in theory. But the side effect of this would be
more codes, more machine resource usage, harder to maintain, and longer time to
make the feature steady, comparing with just extending the abilities of
producer client.

If this come to a case that inter-broker communication is needed and it is not
the case of producer or consumer, I think we should definitely consider to
introduce the dedicated port and protocols.


Thanks,
Haiting

On 2022/07/20 15:47:16 Asaf Mesika wrote:
> Hi,
> 
> We started discussing in PIP-180, which Penghui recommended I move to a
> dedicated thread.
> 
> Pulsar has a public API in its binary protocol, which the clients use to
> communicate with it. Nonetheless, it is its public API to the server.
> 
> I believe the public API should not be changed for internal communication
> purposes. PIP-180 gives a really good example: We would like to introduce a
> new feature called Shadow Topic and would like to replicate messages from
> the source topic to the Shadow topic. It just so happens to be that the
> replication mechanism uses the Broker public API to send messages to a
> broker. The design would like to expand on that by adding a field to this
> public API, to serve that specific feature needs (the field is not generic,
> it's specifically named shadow_message_id).
> 
> I believe someone who tries to reason about Pulsar, and its architecture,
> by looking at its public API should not have any fields which will never be
> relevant to the reader.  It makes it hard to reason and understand the
> public API.
> 
> The second problem is clients: Every such field will eventually trickle
> down to the clients, which will need to ignore that field. In my opinion,
> it makes it harder for the client's maintainers. Especially when the
> community goal is to expand and have many languages clients maintained by
> the community
> 
> The public API today already contains many fields which are only for
> internal use. Here are a few that I found (please correct me if I'm wrong
> here):
> 
> // Property set on replicated message,
> // includes the source cluster name
> optional string replicated_from = 5;
> 
> // Override namespace's replication
> repeated string replicate_to= 7;
> 
> // Identify whether a message is a "marker" message used for
> // internal metadata instead of application published data.
> // Markers will generally not be propagated back to clients
> optional int32 marker_type = 20;
> 
> 
> I would like to discuss that with you, get your feedback and whether you
> think it's correct to accept a decision to avoid changing the public API.
> 
> One alternative I was thinking about (I'm still fairly new, so I don't have
> all the experience and context here) is creating an internal non-public
> API, which will be used for internal communication: different proto,
> different port.
> 
> Thanks for your time,
> 
> Asaf
> 

On 2022/07/20 15:47:16 Asaf Mesika wrote:
> Hi,
> 
> We started discussing in PIP-180, which Penghui recommended I move to a
> dedicated thread.
> 
> Pulsar has a public API in its binary protocol, which the clients use to
> communicate with it. Nonetheless, it is its public API to the server.
> 
> I believe the public API should not be changed for internal communication
> purposes. PIP-180 gives a really good example: We would like to introduce a
> new feature called Shadow Topic and would like to replicate messages from
> the source topic to the Shadow topic. It just so happens to be that the
> replication mechanism uses the Broker public API to send messages to a
> broker. The design wou

Re: [DISCUSS] PIP-180: Shadow Topic, an alternative way to support readonly topic ownership.

2022-07-24 Thread Haiting Jiang
Hi Penghui,

> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 

Thanks for the reminding.

>From what I see, Schema is part of a topic's metadata. So shadow topic won't
have it's own schema, but it shares the schema info of source topic. 

For consumers, we need to suppoort `GetSchema` command for shadow topic, and 
there are
two interface for this.

1. Binary protocol, which handles in `CommandGetSchema` in
   `ServerCnx#handleGetSchema`. We only need to replace the requested shadow
   topic 's `schemaName` to the `schemaName` of source topic, and the
   underlying read operation is supported by
   `SchemaRegistry#getSchema(String, SchemaVersion)`.

2. HTTP protocol, which handles in `SchemasResource#getSchema(...)`. Similar
   with the approach in binary protocol, replace the `schemaId` with source
   topic in `SchemasResourceBase#getSchemaId`.

For admins, we can support other "read" ops besides `getSchema`, including
`getAllSchemas` and `getVersionBySchema`, which all can be supported by the
same way as `getSchema`.


Thanks,
Haiting


On 2022/07/21 02:13:08 PengHui Li wrote:
> Hi Haiting,
> 
> One question about the schema.
> How can the consumer get the schema from the shadow topic during
> consumption?
> We should add this part in the proposal.
> 
> Thanks,
> Penghui
> 
> On Mon, Jul 11, 2022 at 9:09 PM Asaf Mesika  wrote:
> 
> > On Thu, Jun 23, 2022 at 6:26 AM Haiting Jiang 
> > wrote:
> >
> > > Hi Asaf,
> > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from the original topic
> > ledgers.
> > > So
> > > > the only thing you need to do is sync the "metadata" - ledgers list?
> > >
> > > Yes, mostly ledger id list and LAC of the last ledger.
> >
> >
> > > > One question comes to mind here: Why not simply read the ledger
> > > information
> > > > from original topic, without copy?
> > >
> > > Yes, old ledger information will be read from metadata store when
> > > ShadowManagedLedger initializes. The replicator is only for new messages,
> > > to
> > > reduce the consume latency of subscription in shadow topic. And the
> > reason
> > > we also replicates message data is to populates the entry cache when
> > shadow
> > > topic have many active subscriptions.
> > >
> > > One optimization we can do is that, there would be not much help for
> > shadow
> > > replicator to replicate message in backlog. We can come up with some
> > > policy to
> > > reset shadow replicator cursor in future PR.
> > >
> >
> > I'm not sure I'm following you.
> > What do you mean by old ledger information and new ledger information?
> >
> > What I'm trying to understand is: why do you need to copy the source topic
> > metadata: Ledgers ID list and LAC of the last ledger? Why can't you just
> > use the original topic metadata?
> >
> >
> >
> > >
> > > > Another question - I couldn't understand why you need to change the
> > > > protocol to introduce shadow message id. Can you please explain that to
> > > me?
> > > > Is CommandSend used only internally between Pulsar Clusters or used by
> > a
> > > > Pulsar Client?
> > >
> > > CommandSend is designed for pulsar producer client first, and
> > > geo-replication
> > > reuse producer client to replicate messages between pulsar clusters.
> > >
> > > The shadow message id contains the ledger id and entry id of this
> > message.
> > > When shadow topic receive the message id, it is able to update
> > > `lastConfirmedEntry` directly, so that subscription can consume this this
> > > new
> > > message.
> > > Also shadow topic can tell if the message is from shadow replicator and
> > > reject
> > > otherwise.
> > >
> > >
> > I think the flow of information is the part I don't understand.
> >
> > In the PIP you write "The message sync procedure of shadow topic is
> > supported by shadow replication, which is very like geo-replication, with
> > these differences:"
> > What I don't understand is that you write that this is a read-only topic,
> > so why replicate/sync messages?
> >
> > I managed to understand that you want to populate the BK entry cache of the
> > topic ledgers in the shadow topic broker. Instead of reading from BK and
> > storing it in the cache, you favor copying from the source topic broker
> > cache memory to the shadow topic broker cache. Is this to save the
> > bandwidth of BK? I presume the most recent messages of BK would be in
> > memory anyway, no?
> >
> >
> >
> >
> > > Thanks,
> > > Haiting
> > >
> > > On 2022/06/22 15:57:11 Asaf Mesika wrote:
> > > > Hi,
> > > >
> > > > I did a quick reading and I couldn't understand the gist of this
> > change:
> > > > The shadow topic doesn't really have it's own messages, or it's own
> > > ledgers
> > > > right? When it reads messages, it reads from

Re: PIP-187 Add API to analyse a subscription backlog and provide a accurate value

2022-07-24 Thread Asaf Mesika
On Thu, Jul 21, 2022 at 10:36 AM Enrico Olivelli 
wrote:

> Il giorno gio 21 lug 2022 alle ore 06:25 PengHui Li
>  ha scritto:
> >
> > > What if the topic owner creates an internal subscription, consumes the
> > messages, and updates a count per filter.
> >
> > I agree with this approach. If we need to scan all the backlogs to
> > calculate the
> > accurate backlogs for each operation, it's so expensive and difficult to
> > apply to
> > the production environment. With the counter for each
> filter(subscription)
> > and only
> > re-scan the data after the filter changes will reduce a lot of overhead.
>
> This approach is very expensive when you don't need this feature.
> Because in order to have this value you have to read everything (once
> per subscription),
> especially when you have a subscription without consumers and the
> topic is being written.
>
> As you can't know when you will need this value, you are going to
> waste resources in order to calculate
> something that you won't use.
>
>
> This API to analyze a subscription is not meant to be used often, but
> only to debug (manually) problems.
>

I would emphasis that last sentence in the PIP, and epsecially in the
documenation, to clarify that this operation is very expensive
(pehaps capital letters) and should be used mostly manual debugging and not
for actual monitoring.
One option to make it unmissed: "analyzeBacklog" --> "
fullScanBacklogAnalysis" or "AnalyzeBacklogUsingFullScan"


>
> Also, the filter may be dependent on some other environment variables,
> like the wall clock time,
> if you have a filter that depends on time and you pre-calculate the
> backlog your counter won't be correct.
>
I wasn't aware of that - I re-read PIP-105 and then realized the filter has
complete freedom. The filter I had in mind was more constrained in nature
(like where clauss in SQL)


>
> >
> > If we want to expose the accurate backlogs in the Prometheus endpoint,
> > it's almost impossible.
>
> I don't think this is actually possible if you want to take into
> consideration the filters.
> We are in the case of general purpose filtering (actually we allow
> Java code to be plugged into the browser),
> so pre-calculating the counters won't work well.
>
>
> Enrico
>
> >
> > Thanks,
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 11:23 PM Asaf Mesika 
> wrote:
> >
> > > On Wed, Jul 20, 2022 at 5:46 PM Enrico Olivelli 
> > > wrote:
> > >
> > > > Asaf,
> > > >
> > > > Il giorno mer 20 lug 2022 alle ore 15:40 Asaf Mesika
> > > >  ha scritto:
> > > > >
> > > > > I'm not sure I understand the context exactly:
> > > > >
> > > > > You say today we can only know the number of entries, hence we'll
> have
> > > a
> > > > > wrong number of backlog for subscription since:
> > > > > 1. One entry contains multiple messages (batch message)
> > > > > 2. Subscription may contain a filter, which requires you to read
> the
> > > > entire
> > > > > backlog to know it
> > > >
> > > > correct
> > > >
> > > > >
> > > > > There are two things I don't understand:
> > > > >
> > > > > 1. We're adding an observability API, which you need to pay all the
> > > read
> > > > > cost just to know the count. I presume people would want to run
> this
> > > more
> > > > > than once. So they will read same data multiple times - why would a
> > > user
> > > > be
> > > > > willing to pay such a hefty price?
> > > >
> > > > sometimes it is the case, because processing a message may have a
> high
> > > > cost.
> > > > So having 10 entries of 100 messages is not correctly representing
> the
> > > > amount of work that must be done by the consumers
> > > > and so the user may wish to have an exact count.
> > > >
> > > > Having the filters adds more complexity because you cannot predict
> how
> > > > many entries will be filtered out
> > > >
> > > >
> > > > So it's mainly serving that specific use case of reading the entire
> > > messages over and over (every interval) is an order of magnitude less
> > > expensive than the processing it self.
> > >
> > >
> > > > > 2. If the user needs to know an accurate backlog, can't they use
> the
> > > > > ability to create a very large number of topics, thus they will
> know an
> > > > > accurate backlog without the huge cost?
> > > >
> > > > I can't understand why creating many topics will help.
> > > > instead with filters it is very likely that you have only fewer
> topics
> > > > with many subscriptions with different filters
> > > >
> > > > as you don't know the filters while writing you cannot route the
> > > > messages to some topic
> > > > also you would need to write the message to potentially multiple
> > > > topics, and that would be a huge write amplification
> > > > (think about a topic with 100 subscriptions)
> > > >
> > > > Yes, I haven't thought about that.
> > > What I was thinking is that those filters are mutually exclusive
> therefor
> > > topics, but in your case, if you have 100 different filters, and they
> > > overlap, yes it would be way more exp

Re: Pulsar Flaky test report 2022-07-19 for PR builds in CI

2022-07-24 Thread Asaf Mesika
Fantastic initiative.


On Thu, Jul 21, 2022 at 10:59 PM Nicolò Boschi  wrote:

> Dear community,
>
> Here's a report of the flaky tests in Pulsar CI during the observation
> period of 2022-07-12 - 2022-07-19
>
>
> https://docs.google.com/spreadsheets/d/165FHpHjs5fHccSsmQM4beeg6brn-zfUjcrXf6xAu4yQ/edit?usp=sharing
>
> The report contains a subset of the test failures.
> The flaky tests are observed from builds of merged PRs.
> The GitHub Actions logs will be checked for builds where the SHA of the
> head of the PR matches the SHA which got merged.
> This ensures that all found exceptions are real flakes, since no changes
> were made to the PR to make the tests pass later
> so that the PR was merged successfully.
>
>
>
> Top 5 flaky issues to fix:
>
> org.apache.pulsar.tests.integration.functions.java.PulsarFunctionsJavaThreadTest.testJavaLoggingFunction
> 92
> org.apache.pulsar.functions.worker.PulsarFunctionTlsTest.tearDown 36
>
> org.apache.pulsar.broker.service.persistent.PersistentSubscriptionMessageDispatchStreamingDispatcherThrottlingTest.testDispatchRateCompatibility2
> 27
>
> org.apache.pulsar.broker.service.persistent.PersistentDispatcherFailoverConsumerStreamingDispatcherTest.testAddRemoveConsumer
> 23
>
> org.apache.pulsar.broker.service.PersistentDispatcherFailoverConsumerTest.testAddRemoveConsumer
> 22
>
>
> Markdown formatted summary reports for each test class can be accessed at
>
> https://github.com/nicoloboschi/pulsar-flakes/tree/master/2022-07-12-to-2022-07-19
>
>
> More flaky test issues:
>
> https://github.com/apache/pulsar/issues?q=flaky+sort%3Aupdated-desc+is%3Aopen+is:issue
>
> We need more help in addressing the flaky tests. Please join the efforts
> so that we can get CI to a more stable state.
>
> To coordinate the work,
> 1) please search for an existing issues or search for all flaky issues with
> "flaky" or the test class name (without package) in the search:
> https://github.com/apache/pulsar/issues?q=is%3Aopen+flaky
> +sort%3Aupdated-desc
> 2) If there isn't an issue for a particular flaky test failure that you'd
> like to fix, please create an issue using the "Flaky test" template at
> https://github.com/apache/pulsar/issues/new/choose
> 3) Please comment on the issue that you are working on.
>
> We have a few active contributors working on the flaky tests, thanks for
> the contributions.
>
> I'm looking forward to more contributors joining the efforts. Please join
> the #testing channel on Slack if you'd like to ask questions and tips about
> reproducing flaky tests locally and how to fix them.
> Sharing stories about fixing flaky tests is also helpful for sharing the
> knowledge about how flaky tests can be fixed. That's also a valuable way to
> contribute.
> Some flaky tests might be actual real production code bugs. Fixing
> the flaky test might result in fixing a real production code bug.
>
> More contributions are welcome! Please keep up the good work!
>
> Thanks,
> Nicolò Boschi
>


Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Asaf Mesika
Hi,

Few questions:


>- preprocess-function: the preprocess function applied before the
>Sink. Starts by builtin:// for built-in functions, function:// for
>package function, http:// or file://
>
> 1. While this function is applied only before sink? I thought it replaces
the identity function, so why a source can't have a function that reads
from the source (say S3), runs the function and only then writes to a
pulsar topic?
 2. Can you clarify more about built in and function for package function?
Is this an existing functionality ?
3. Regarding http - Are you loading a class through that URL? Aren't we
exposed to same problem Log4Shell security issue had? If so, what measures
are you taking to protect ?

The field extraFunctionPackageLocation to the protobuf structure
> FunctionMetaData will be added. This field will be filled with the
> location of the extra function to apply when registering a sink and used in
> the Runtime to load the function code.

Can you please expand on that? You mean the JAR location, which you will
search that class name and function specified in the 3 fields you've added
to the config?

The parameters extraFunctionFile and originalExtraFunctionFileName will be
> added to RuntimeFactory::createContainer

1. File and fileName containing what? How does this related to
extraFunctionPackageLocation?

In here you use the terminology Extra Function" and in fields of config and
admin you used the term Pre-Process Function. I would stick to Pro-Process
Function and stick with it all over.


> The following parameters will be added to JavaInstanceStarter:
>
>- --extra_function_jar: the path to the extra function jar
>
>
>- --extra_function_id: the extra function UUID cache key
>
> These parameters are then used by the ThreadRuntime to load the function
> from the FunctionCacheManager or create it there if needed.


Can you elaborate on that? JavaInstanceStarter is used to start a single
Function? It's used from command line?


In general, you will essentially have two class loaders - one for the sink
and one for the pre-process function?





On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet 
wrote:

> Dear Pulsar dev community,
>
> I would like to open a discussion here about PIP 193 : Sink preprocessing
> Function .
>
> Best regards
>
> Christophe
>
> ## Motivation
>
> Pulsar IO connectors make it possible to connect Pulsar to an external
> system:
> * A Source reads continuously from an external system and writes to a
> Pulsar topic
> * A Sink reads continuously from a Pulsar topic and writes to an external
> system.
> Sources and Sinks are written in Java.
>
> Pulsar also has a lightweight computing system named Pulsar Functions. A
> Pulsar Function reads from one or more topics, applies user logic written
> in Java, Python or Go and writes to an output topic.
>
> When using Pulsar IO connectors, the format of what is read/written from/to
> the source/sink is defined by the connector code. But there are a lot of
> situations where a user wants to transform this data before using it.
> Currently the solution is to either :
> * write a custom connector that transforms the data the way we want but
> that means writing a lot of code without reuse, packaging and managing
> custom connectors and so on..
> * write a Function to transform the data after it was written to a topic by
> a Source or before it is read from a topic by a Sink. This is not very
> efficient as we have to use an intermediate topic, which means additional
> storage, IO, and latency.
>
> Considering all this, it would be handy to be able to apply a Function
> on-the-fly to a connector without going through an intermediary topic.
>
> ## Goal
>
> This PIP defines the changes needed to be able to apply a preprocessing
> Function on-the-fly to a Sink.
> The preprocessing function can be a built-in function, a package function,
> or loaded through an http URL or a file path.
> Sources, Sinks and Functions are based on the same runtime process that:
> * reads from a Source. For Sinks and Functions this Source is a
> PulsarSource consuming from a Pulsar topic
> * applies a Function. For Sources and Sinks, this Function is
> IdentityFunction which returns the data it gets without modification.
> * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> writing to a Pulsar topic.
>
> This PIP reuses this and allows configuring a Function different from
> IdentityFunction to Sinks.
> Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
>
> Out of the scope of this PIP, for future work:
> * Applying a post-processing Function to a Source
> * Loading the Function jar through the Sink CLI
>
> ## API Changes
>
> ### Admin CLI
>
> The following options will be added to the `pulsar-admin sinks` CLI
>  `create`, `update` and `localrun`:
> * `preprocess-function`: the preprocess function applied 

Re: [Discuss] PIP-190: Simplify Pulsar documentation release and maintenance strategy

2022-07-24 Thread Asaf Mesika
If I understand correctly, using tags, you can automatically create the
docs for the next minor version?
So for older minor versions, where those files will be at? using tags from
git?

On Sat, Jul 23, 2022 at 7:04 AM Ma Jun  wrote:

> Hi, Pulsar community,
>
> Happy weekend!
>
> I'd like to open a discussion about PIP-190: Simplify Pulsar documentation
> release and maintenance strategy.
>
> Proposal link: https://github.com/apache/pulsar/issues/16637.
>
>
> -
>
> ## Motivation
>
> This proposal is focused on improving and simplifying the release and
> maintenance strategy of the existing Apache Pulsar documentation.
>
> In general, this proposal provides equal values and a better user
> experience without sacrificing unpredictable efforts in the release and
> maintenance process. The benefits include:
> * Improve user experience when they look for the documentation of a
> specific release.
> * Optimize the doc development and release process for bug-fix releases:
>* Turn doc development for bug-fix releases from post-release into
> just-in-time.
>* Save Release Manager’s effort in generating doc files for bug-fix
> releases.
> * Save a great amount of the community’s effort in syncing doc
> improvements and fixes to historical doc versions that are in the
> maintenance cycle.
>
>
> Since there are quite a few illustrations and links in this proposal, I
> didn't copy and paste the complete content into this email thread. You can
> access the GitHub link to review the proposal with a better view.
>
>
> Cheers!
>
> momo-jun
>
>


Re: [DISCUSS] PIP-191: Support batched message using entry filter

2022-07-24 Thread Anon Hxy
There is an example maybe helpful to understand the properties extraction:

- Let's set `batchedFilterProperties`=``
This means only key named `region` and `version`will be extracted to the
batch meta properties

- Then we have a  producer that sends the messges below in order:
- `msg1` with properties: ``
- `msg2` with properties: ``
- `msg3` with properties: ``
- `msg4` with properties: ``
- `msg5` with properties: ``
- `msg6` with properties: ``

- The process of properties extraction will be:
   - msg1 and msg2 have the same properties: , so they will put
into the same batch
   - msg3 and msg4 have the same properties: .
 tag:a in msg3 will be ignored because the `batchedFilterProperties`
doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
- msg5 and msg6 have different properties, because the value of version
is different. So we publish msg5 and msg6 with different batch.

- Just to summarize, the result will be:

| |batch meta properties | single meta properties
| payload | single meta properties   |  payload  |
|--- |-- |--- |--
  |-|-- |
|batch1  |  |  
| msg1 | |   msg2 |
|batch2 |   |  | msg3
|   |   msg4 |
|batch3 |  |  |
msg5 | |   |
|batch4 |  | |
msg6 | |  |

Thanks,
Xiaoyu Hou

Anon Hxy  于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at the batch header,  without digging into the individual messages
> properties. Of course it provides an interface to deserialize the entire
> Entry to the heap,  But this will bring some memory and cpu workload. And
> in most scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry
> without having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List` with default value
> is empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl msg, SendCallback callback) {
>
> if (log.isDebugEnabled()) {
> log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
> numMessagesInBatch);
> }
>
> if (++numMessagesInBatch == 1) {
> try {
> // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
> // the first message
> messageMetadata.setSequenceId(msg.getSequenceId());
> List filterProperties = getProperties(msg);
> if (!filterProperties.isEmpty()) {
> messageMetadata.addAllProperties(filterProperties);
>  // and message properties here
> }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl msg) {
> return batchMessageContainer.haveEnoughSpace(msg)
>&& (!isMultiSchemaEnabled(false)
> || batchMessageContainer.hasSameSchema(msg)
> || batchMessageContainer.hasSameProperties(msg))
> && batchMessageContainer.hasSameTxn(msg);
> }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>


[DISCUSS] PIP 194 : Pulsar client: seek command add epoch

2022-07-24 Thread Qiang Huang
Hi Pulsar community:
I open a pip to discuss "Pulsar client: seek command add epoch"
Proposal Link:

   - issue link: https://github.com/apache/pulsar/issues/16757

--
## Motivation
`Reader` belongs to exclusive subscription type, and it uses `nonDurable`
cursor. After receiving messages, `Reader` will ack cumulatively
immediately.
The `flowPermits` are triggered in multiple scenarios from the client side
and it is isolated from `seek` of `Consumer`. Therefore, it is possibile
that `flowPermits` will execute after `seek` from the client side, like the
following flow chart.

[image: image.png]

When `handleSeek` processing is delay from the server side, the `MarkDelete
position` is modified in a wrong way.
The expected result is that `Reader`can re-consume messages from `mark
delete:(1,1)` after `seek`. But it doesn't work.

Pulsar read message and seek position is not a synchronous operation, the
seek request can't prevent an in-process entry reading operation. The
client-side also has an opportunity to receive messages after the seek
position.

Pulsar client make read messages operation and seek position operation
synchronized so add an epoch into server and client consumer.  After client
reader consumer invoke `seek` , the epoch increase 1 and send `seek`
 command carry the epoch and then server consumer will update the epoch.
When dispatcher messages to client will carry the epoch which the cursor
read at the time. Client consumer will filter the send messages command
which is smaller than current epoch.
In this way, after the client consumer send `seek` command successfully,
because it has passed the epoch filtering, the consumer will not receive a
message with a messageID greater than the user previously seek position.


### Current implementation details
 CommandSeek Protocal
```proto
// Reset an existing consumer to a particular message id
message CommandSeek {
required uint64 consumer_id = 1;
required uint64 request_id  = 2;

optional MessageIdData message_id = 3;
optional uint64 message_publish_time = 4;
}
```
### CommandMessage
```proto
message CommandMessage {
required uint64 consumer_id   = 1;
required MessageIdData message_id = 2;
optional uint32 redelivery_count  = 3 [default = 0];
repeated int64 ack_set = 4;
optional uint64 epoch = 5 [default = 0];
}
```
`CommandMessage` already add epoch by [PIP-84](
https://github.com/apache/pulsar/wiki/PIP-84-:-Pulsar-client:-Redeliver-command-add-epoch)
, when client receive `CommandMessage` will compare the command epoch and
local epoch to handle this command.

## Goal
Add epoch into seek command.

## API Changes
### Protocal change: CommandSeek
```proto
// Reset an existing consumer to a particular message id
message CommandSeek {
required uint64 consumer_id = 1;
required uint64 request_id  = 2;

optional MessageIdData message_id = 3;
optional uint64 message_publish_time = 4;
optional uint64 consumer_epoch = 5;
}
```
`CommandSeek` command add epoch field, when client send seek command to
server successfully, the server will change the server consumer epoch to
the command epoch. The epoch only can bigger than the old epoch in server.
Now the client can filter out the message which contains less consumer
epoch.

## Implementation
- stage 1: Check the current cursor status when handling flowPermits from
the server side.
- stage 2: Add epoch into seek command, and server update the consumer
epoch. It can prevent an in-process entry reading operation after the seek
request.

## Reject Alternatives
None yet.

## Note
1. Consumer reconnect need reset epoch.

-- 
BR,
Qiang Huang


Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Christophe Bornet
Thanks for your comments Baodi


> This proposal looks good to me.
>
> > Only Functions returning a Record will be authorized to ensure that the
> Function sets the Schema explicitly.
>
> Does this mean that the function return type is fixed Record?


Yes


> Can the interface declaration of the function be displayed in the API
> changes?
>
>
Can you explain more what you mean ?
This PIP doesn't change the API of a Function and it's already possible to
write a Function>.
And when declaring a Sink with a Function we'll check that it's the case.



>
> Thanks,
> Baodi Shi
>
> > On Jul 22, 2022, at 17:4828, Christophe Bornet 
> wrote:
> >
> > Dear Pulsar dev community,
> >
> > I would like to open a discussion here about PIP 193 : Sink preprocessing
> > Function .
> >
> > Best regards
> >
> > Christophe
> >
> > ## Motivation
> >
> > Pulsar IO connectors make it possible to connect Pulsar to an external
> > system:
> > * A Source reads continuously from an external system and writes to a
> > Pulsar topic
> > * A Sink reads continuously from a Pulsar topic and writes to an external
> > system.
> > Sources and Sinks are written in Java.
> >
> > Pulsar also has a lightweight computing system named Pulsar Functions. A
> > Pulsar Function reads from one or more topics, applies user logic written
> > in Java, Python or Go and writes to an output topic.
> >
> > When using Pulsar IO connectors, the format of what is read/written
> from/to
> > the source/sink is defined by the connector code. But there are a lot of
> > situations where a user wants to transform this data before using it.
> > Currently the solution is to either :
> > * write a custom connector that transforms the data the way we want but
> > that means writing a lot of code without reuse, packaging and managing
> > custom connectors and so on..
> > * write a Function to transform the data after it was written to a topic
> by
> > a Source or before it is read from a topic by a Sink. This is not very
> > efficient as we have to use an intermediate topic, which means additional
> > storage, IO, and latency.
> >
> > Considering all this, it would be handy to be able to apply a Function
> > on-the-fly to a connector without going through an intermediary topic.
> >
> > ## Goal
> >
> > This PIP defines the changes needed to be able to apply a preprocessing
> > Function on-the-fly to a Sink.
> > The preprocessing function can be a built-in function, a package
> function,
> > or loaded through an http URL or a file path.
> > Sources, Sinks and Functions are based on the same runtime process that:
> > * reads from a Source. For Sinks and Functions this Source is a
> > PulsarSource consuming from a Pulsar topic
> > * applies a Function. For Sources and Sinks, this Function is
> > IdentityFunction which returns the data it gets without modification.
> > * writes to a Sink. For Sources and Functions, this Sink is a PulsarSink
> > writing to a Pulsar topic.
> >
> > This PIP reuses this and allows configuring a Function different from
> > IdentityFunction to Sinks.
> > Only Functions returning a Record will be authorized to ensure that the
> > Function sets the Schema explicitly.
> >
> > Out of the scope of this PIP, for future work:
> > * Applying a post-processing Function to a Source
> > * Loading the Function jar through the Sink CLI
> >
> > ## API Changes
> >
> > ### Admin CLI
> >
> > The following options will be added to the `pulsar-admin sinks` CLI
> > `create`, `update` and `localrun`:
> > * `preprocess-function`: the preprocess function applied before the Sink.
> > Starts by `builtin://` for built-in functions, `function://` for package
> > function, `http://` or `file://`
> > * `preprocess-function-classname`: the preprocess function class name
> > (optional if the function is a NAR)
> > * `preprocess-function-config`: the configuration of the preprocess
> > function in the same format as the `user-config` parameter of the
> > `functions create` CLI command.
> >
> > The corresponding fields will be added to `SinkConfig`:
> >
> > ```java
> >private String preprocessFunction;
> >private String preprocessFunctionClassName;
> >private String preprocessFunctionConfig;
> > ```
> >
> > ### Function definition
> >
> > The field `extraFunctionPackageLocation` to the protobuf structure
> > `FunctionMetaData` will be added. This field will be filled with the
> > location of the extra function to apply when registering a sink and used
> in
> > the Runtime to load the function code.
> >
> > ```protobuf
> > message FunctionMetaData {
> >...
> >PackageLocationMetaData extraFunctionPackageLocation = 7;
> > }
> > ```
> >
> > ### Runtime
> >
> > The parameters `extraFunctionFile` and `originalExtraFunctionFileName`
> will
> > be added to `RuntimeFactory::createContainer`
> >
> >
> > ```java
> >   Runtime createContainer(
> >InstanceConfig instanceConfig, String codeFile, String
> > originalCodeFil

Re: [DISCUSS] PIP 193 : Sink preprocessing Function

2022-07-24 Thread Christophe Bornet
Thanks for the feedback Asaf


> >- preprocess-function: the preprocess function applied before the
> >Sink. Starts by builtin:// for built-in functions, function:// for
> >package function, http:// or file://
> >
> > 1. While this function is applied only before sink? I thought it replaces
> the identity function, so why a source can't have a function that reads
> from the source (say S3), runs the function and only then writes to a
> pulsar topic?
>

Yes that's totally possible to implement and will be done in future work
like written in the PIP.


>  2. Can you clarify more about built in and function for package function?
> Is this an existing functionality ?
>
Yes those are existing functionalities.
Built-in functions are not documented (and we should do something about
that).
Package management of functions is described in
https://pulsar.apache.org/docs/functions-deploy#use-package-management-service


> 3. Regarding http - Are you loading a class through that URL? Aren't we
> exposed to same problem Log4Shell security issue had? If so, what measures
> are you taking to protect ?
>
Yes we are loading code via URL. This feature already exists for
Sources/Sinks/Functions.
I guess you need to have a huge trust of the source from where you download.
This PIP has the same security level as what already exists for this
functionality.


>
> The field extraFunctionPackageLocation to the protobuf structure
> > FunctionMetaData will be added. This field will be filled with the
> > location of the extra function to apply when registering a sink and used
> in
> > the Runtime to load the function code.
>
> Can you please expand on that? You mean the JAR location, which you will
> search that class name and function specified in the 3 fields you've added
> to the config?
>
Not exactly. It's the location of where the JAR is stored. It can be
BookKeeper, package management, built-in NAR, etc...
In KubernetesRuntime, there are cases where the builtin or package function
you provide in the preprocess-function param could be copied to BK.
That's the same as for a regular Sink/Source and if we need to copy to BK,
we append `__sink-function` to the storage path to prevent conflict with
the sink code.
The class name is indeed looked up in this JAR.


> The parameters extraFunctionFile and originalExtraFunctionFileName will be
> > added to RuntimeFactory::createContainer
>
> 1. File and fileName containing what? How does this related to
> extraFunctionPackageLocation?
>
That part mimicks what is already done for the main code of the Source/Sink
code (with respectively codeFile, originalCodeFileName and packageLocation)
Before starting the ThreadedRuntime, we download locally the JAR from the
extraFunctionPackageLocation in the extraFunctionFile so we can load the
code from it.


>
> In here you use the terminology Extra Function" and in fields of config and
> admin you used the term Pre-Process Function. I would stick to Pro-Process
> Function and stick with it all over.
>
This terminology need to be applicable to a Function that would be applied
after a Source so we can't use  "preprocess" here.
I haven't found better than "extra function". Don't hesitate to propose
something !


>
>
> > The following parameters will be added to JavaInstanceStarter:
> >
> >- --extra_function_jar: the path to the extra function jar
> >
> >
> >- --extra_function_id: the extra function UUID cache key
> >
> > These parameters are then used by the ThreadRuntime to load the function
> > from the FunctionCacheManager or create it there if needed.
>
>
> Can you elaborate on that? JavaInstanceStarter is used to start a single
> Function? It's used from command line?

The  JavaInstanceStarter is indeed a CLI to start a JavaInstance.
The JavaInstance is the process that will execute the code to read from a
Source, execute a Function and write to a Sink.
Generally Pulsar users don't use the JavaInstanceStarter directly. The
command line is forged by the ProcessRuntime and KubernetesRuntime.

>
>
> In general, you will essentially have two class loaders - one for the sink
> and one for the pre-process function?
>
Yes, exactly.
3 to be more accurate since there's also the instance class loader.


>
>
>
>
>
> On Fri, Jul 22, 2022 at 12:48 PM Christophe Bornet  >
> wrote:
>
> > Dear Pulsar dev community,
> >
> > I would like to open a discussion here about PIP 193 : Sink preprocessing
> > Function .
> >
> > Best regards
> >
> > Christophe
> >
> > ## Motivation
> >
> > Pulsar IO connectors make it possible to connect Pulsar to an external
> > system:
> > * A Source reads continuously from an external system and writes to a
> > Pulsar topic
> > * A Sink reads continuously from a Pulsar topic and writes to an external
> > system.
> > Sources and Sinks are written in Java.
> >
> > Pulsar also has a lightweight computing system named Pulsar Functions. A
> > Pulsar Function reads from one or mo

Re: [DISCUSS] Apache Pulsar 2.8.4 release

2022-07-24 Thread mattison chao
Thanks for your work!

Best,
Mattison

On Fri, 22 Jul 2022 at 11:34, Yunze Xu  wrote:
>
> Sure, I will take it carefully for those PRs not cherry-picked to branch-2.8 
> but labeled
> as `release/2.8.4`.
>
> Thanks,
> Yunze
>
>
>
>
> > 2022年7月22日 00:09,Dave Fisher  写道:
> >
> > Thank you for volunteering!
> >
> >> On Jul 21, 2022, at 12:57 AM, Yunze Xu  
> >> wrote:
> >>
> >> Hello Pulsar Community,
> >>
> >> It has been several months since the 2.8.3 release and there are many
> >> important fixes after that. For example, there is a performance
> >> regression for message listener that was introduced from 2.8.3 and
> >> fixed in https://github.com/apache/pulsar/pull/15162.
> >>
> >> I volunteer to be the release manager for 2.8.4.
> >>
> >> Here [0] you can find the list of 149 commits to branch-2.8 since the
> >> 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
> >> yet been cherry-picked [1] and I will cherry-pick them and solve the
> >> conflicts if necessary. There are 18 open PRs labeled with
> >> `release/2.8.4` [2]. I will check the status of them and see if they
> >> can be pushed to 2.8.4.
> >
> > Please use discretion about selecting bug fixes only and avoid adding new 
> > features or unexpected configuration changes. Thanks!
> >
> > Ask for help with the cherry picks since being in a Release Manager is a 
> > big job!
> >
> > Regards,
> > Dave
> >
> >
> >>
> >> Thanks,
> >> Yunze
> >>
> >> [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
> >> [1] - 
> >> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
> >> [2] - 
> >> https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
> >>
> >
>


[DISCUSS] PIP-195: New bucket based delayed message tracker

2022-07-24 Thread Cong Zhao
Hello Pulsar Community,


I would like to open a discussion here about PIP-195 : New bucket based
delayed message tracker. I look forward to your feedback.


PIP: https://github.com/apache/pulsar/issues/16763


Thanks,

Cong Zhao


[VOTE]PIP-189: No batching if only one message in batch.

2022-07-24 Thread Anon Hxy
Dear Community,

I would like to start a VOTE on "PIP-189: No batching if only one message
in batch."

The proposal can be read at [0] and the discussion thread is available at
[1] and the PR link is available at [2]

Voting will stay open for at least 48h.

[0] https://github.com/apache/pulsar/issues/16619
[1] https://lists.apache.org/thread/dbq1lrv03bhtk0lr5nwm5txo9ndjplv0
[2] https://github.com/apache/pulsar/pull/16605

Thanks,
Xiaoyu


Re: [DISCUSS] Apache Pulsar 2.8.4 release

2022-07-24 Thread Haiting Jiang
Hi Enrico,

> We should also one last release out of 2.7 branch.
> I hope that someone will volunteer

I volunteer to be the release manager for 2.7.5.
I will start the discuss mail shortly.

Thanks,
Haiting

On 2022/07/21 16:56:50 Enrico Olivelli wrote:
> Thank you Yunze
> 
> I agree that it is time to cut a release of 2.8.
> 
> We should also one last release out of 2.7 branch.
> I hope that someone will volunteer
> 
> Enrico
> 
> Il Gio 21 Lug 2022, 18:10 Dave Fisher  ha scritto:
> 
> > Thank you for volunteering!
> >
> > > On Jul 21, 2022, at 12:57 AM, Yunze Xu 
> > wrote:
> > >
> > > Hello Pulsar Community,
> > >
> > > It has been several months since the 2.8.3 release and there are many
> > > important fixes after that. For example, there is a performance
> > > regression for message listener that was introduced from 2.8.3 and
> > > fixed in https://github.com/apache/pulsar/pull/15162.
> > >
> > > I volunteer to be the release manager for 2.8.4.
> > >
> > > Here [0] you can find the list of 149 commits to branch-2.8 since the
> > > 2.8.3 release. There are 54 closed PRs targeting 2.8.4 that have not
> > > yet been cherry-picked [1] and I will cherry-pick them and solve the
> > > conflicts if necessary. There are 18 open PRs labeled with
> > > `release/2.8.4` [2]. I will check the status of them and see if they
> > > can be pushed to 2.8.4.
> >
> > Please use discretion about selecting bug fixes only and avoid adding new
> > features or unexpected configuration changes. Thanks!
> >
> > Ask for help with the cherry picks since being in a Release Manager is a
> > big job!
> >
> > Regards,
> > Dave
> >
> >
> > >
> > > Thanks,
> > > Yunze
> > >
> > > [0] - https://github.com/apache/pulsar/compare/v2.8.3...branch-2.8
> > > [1] -
> > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aclosed
> > > [2] -
> > https://github.com/apache/pulsar/pulls?q=is%3Apr+label%3Arelease%2F2.8.4+-label%3Acherry-picked%2Fbranch-2.8+is%3Aopen
> > >
> >
> >
> 


Re: [VOTE] PIP-174: Provide new implementation for broker dispatch cache

2022-07-24 Thread Matteo Merli
Closing this vote with 3 +1s and no -1s:

+1s:
 * Matteo
 * PengHui
 * Dave

Thanks,
Matteo

--
Matteo Merli


On Thu, Jul 21, 2022 at 7:58 PM Dave Fisher  wrote:
>
> Sorry I’m late to this discussion.
>
> I think that the motivation is correct. There is really quite a bit of 
> activity around this issue. Let’s take extra efforts to engage extra time 
> with commits to confirm performance improvements.
>
> Let’s particularly pay attention to threading.
>
> +1
>
> Regards,
> Dave
>
> Sent from my iPhone
>
> > On Jul 21, 2022, at 11:37 AM, Matteo Merli  wrote:
> >
> > ## Motivation
> >
> > The current implementation of the read cache in the Pulsar broker has 
> > largely
> > remained unchanged for a long time, except for a few minor tweaks.
> >
> > While the implementation is stable and reasonably efficient for
> > typical workloads,
> > the overhead required for managing the cache evictions in a broker
> > that is running
> > many topics can be pretty high in terms of extra CPU utilization and on the 
> > JVM
> > garbage collection to track an increased number of medium-lived objects.
> >
> > The goal is to provide an alternative implementation that can adapt better 
> > to
> > a wider variety of operating conditions.
> >
> > ### Current implementation details
> >
> > The broker cache is implemented as part of the `ManagedLedger` component,
> > which sits in the Pulsar broker and provides a higher level of
> > abstraction of top
> > of BookKeeper.
> >
> > Each topic (and managed-ledger) has its own private cache space. This
> > cache is implemented
> > as a `ConcurrentSkipList` sorted map that maps `(ledgerId, entryId) ->
> > payload`. The payload
> > is a `ByteBuf` reference that can either be a slice of a `ByteBuf` that we 
> > got
> > when reading from a socket, or it can be a copied buffer.
> >
> > Each topic cache is allowed to use the full broker max cache size before an
> > eviction is triggered. The total cache size is effectively a resource
> > shared across all
> > the topics, where a topic can use a more prominent portion of it if it
> > "asks for more".
> >
> > When the eviction happens, we need to do an expensive ranking of all
> > the caches in the broker
> > and do an eviction in a proportional way to the currently used space
> > for each of them.
> >
> > The bigger problem is represented by the `ConcurrentSkipList` and the
> > `ByteBuf` objects
> > that need to be tracked. The skip list is essentially like a "tree"
> > structure and needs to
> > maintain Java objects for each entry in the cache. We also need to
> > potentially have
> > a huge number of ByteBuf objects.
> >
> > A cache workload is typically the worst-case scenario for each garbage
> > collector implementation because it involves creating objects, storing
> > them for some amount of
> > time and then throwing them away. During that time, the GC would have
> > already tenured these
> > objects and copy them into an "old generation" space, and sometime
> > later, a costly compaction
> > of that memory would have to be performed.
> >
> > To mitigate the effect of the cache workload on the GC, we're being
> > very aggressive in
> > purging the cache by triggering time-based eviction. By putting a max
> > TTL on the elements in
> > the cache, we can avoid keeping the objects around for too long to be
> > a problem for the GC.
> >
> > The reverse side of this is that we're artificially reducing the cache
> > capacity to a very
> > short time frame, reducing the cache usefulness.
> >
> > The other problem is the CPU cost involved in doing these frequent
> > evictions, which can
> > be very high when there are 10s of thousands of topics in a broker.
> >
> >
> > ## Proposed changes
> >
> > Instead of dealing with individual caches for each topic, let's adopt
> > a model where
> > there is a single cache space for the broker.
> >
> > This cache is broken into N segments which act as a circular buffer.
> > Whenever a segment
> > is full, we start writing into the next one, and when we reach the
> > last one, we will
> > restart recycling the first segment.
> >
> > This model has been working very well for the BookKeeper `ReadCache`:
> > https://github.com/apache/bookkeeper/blob/master/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/ReadCache.java
> >
> > The eviction becomes a completely trivial operation, buffers are just
> > rotated and
> > overwritten. We don't need to do any per-topic task or keep track of
> > utilization.
> >
> > Today, there are 2 ways of configuring the cache, one that "copies"
> > data into the cache
> > and another that will just use reference-counting on the original
> > buffers to avoid
> > payload copies.
> >
> > ### Memory copies into the cache
> >
> > Each segment is composed of a buffer, an offset, and a hashmap which maps
> > `(ledgerId, entryId) -> offset`.
> >
> >
> > The advantage of this approach is that entries are copied into the cache 
> > buffer
> > (in direct memory), a

Re: [VOTE]PIP-189: No batching if only one message in batch.

2022-07-24 Thread Haiting Jiang
+1
Thanks,
Haiting

On 2022/07/25 02:23:20 Anon Hxy wrote:
> Dear Community,
> 
> I would like to start a VOTE on "PIP-189: No batching if only one message
> in batch."
> 
> The proposal can be read at [0] and the discussion thread is available at
> [1] and the PR link is available at [2]
> 
> Voting will stay open for at least 48h.
> 
> [0] https://github.com/apache/pulsar/issues/16619
> [1] https://lists.apache.org/thread/dbq1lrv03bhtk0lr5nwm5txo9ndjplv0
> [2] https://github.com/apache/pulsar/pull/16605
> 
> Thanks,
> Xiaoyu
> 


Re: [VOTE] Pulsar Release 2.7.5 Candidate 1

2022-07-24 Thread Haiting Jiang
Hi Lari,

> The "CI - Integration - Cli / cli" build job passes cleanly for 2.7.4 . This 
> means that it's very likely that there's a regression in 2.7.5 . I'll 
> investigate the issue.

Any progress on this?

Thanks,
Haiting

On 2022/02/16 10:54:14 Lari Hotari wrote:
> I'll vote again with -1.
> 
> The "CI - Integration - Cli / cli" build job passes cleanly for 2.7.4 . This 
> means that it's very likely that there's a regression in 2.7.5 . I'll 
> investigate the issue.
> 
> -Lari
> 
> On 2022/02/16 10:21:23 Lari Hotari wrote:
> > The branch-2.7 build didn't complete cleanly for the release commit 
> > https://github.com/apache/pulsar/commit/ed40840a65c413202c0aab6b9b1a29ed370f6fb7
> >  .
> > (click on the red cross icon before the text "Release 2.7.5" to see the 
> > build results popup)
> > 
> > There are 3 failing build jobs:
> > - 2 for "CI - Build - Multiple - OS / build"
> > - "CI - Integration - Cli / cli" 
> > 
> > It's safe to ignore the failures for "CI - Build - Multiple - OS / build" 
> > since that doesn't run tests and the build was also failing in the 2.7.4 
> > release commit:
> > https://github.com/apache/pulsar/commit/ab451b855d873a9bad2005f939a23118a583baa9
> > (click on the red cross icon to see build results)
> > 
> > I am running the failing "CI - Integration - Cli / cli" build job locally 
> > with the command "mvn -f tests/pom.xml test 
> > -DintegrationTestSuiteFile=pulsar-cli.xml -DintegrationTests 
> > -DredirectTestOutputToFile=false" (after building the 2.7.5 docker images 
> > and docker test image locally). 
> > 
> > The test doesn't complete cleanly. This is the error message in integration 
> > tests:
> > [integration] [INFO] [stdout] 
> > org.apache.bookkeeper.mledger.ManagedLedgerException: Not enough non-faulty 
> > bookies available
> > [integration] [INFO] [stdout] 08:20:16.613 
> > [bookkeeper-ml-workers-OrderedExecutor-3-0] WARN  
> > org.apache.pulsar.broker.service.ServerCnx - 
> > [/172.19.0.8:54068][persistent://compaction-test-rest-wmxw/ns1/topic1][sub1]
> >  Failed to create consumer: consumerId=0, 
> > org.apache.bookkeeper.mledger.ManagedLedgerException: Not enough non-faulty 
> > bookies available
> > 
> > I can see this error message in the bookies (docker exec -it 
> > CLITest-gmwag-pulsar-bookie-1 cat /var/log/pulsar/bookie.log)
> > 08:10:20.616 [bookie-io-1-9] ERROR org.apache.bookkeeper.proto.BookieServer 
> > - Unable to allocate memory, exiting bookie
> > io.netty.util.internal.OutOfDirectMemoryError: failed to allocate 16777216 
> > byte(s) of direct memory (used: 536870912, max: 536870912)
> > at 
> > io.netty.util.internal.PlatformDependent.incrementMemoryCounter(PlatformDependent.java:802)
> >  ~[io.netty-netty-common-4.1.68.Final.jar:4.1.68.Final]
> > 
> > This is the flaky test issue https://github.com/apache/pulsar/issues/9622 
> > fixed by PR https://github.com/apache/pulsar/pull/9623 which was missing 
> > from branch-2.7 . 
> > After cherry-picking PR 9623, I can run CLI integration tests locally. 
> > There were 3 test failures: 
> > - 
> > org.apache.pulsar.tests.integration.cli.ClusterMetadataTearDownTest#testDeleteCluster
> > java.lang.AssertionError: expected [0] but found [18]
> > at 
> > org.apache.pulsar.tests.integration.cli.ClusterMetadataTearDownTest.testDeleteCluster(ClusterMetadataTearDownTest.java:122)
> > - 
> > org.apache.pulsar.tests.integration.cli.CLITest#testCreateSubscriptionCommand
> > org.apache.pulsar.tests.integration.docker.ContainerExecException: 
> > /pulsar/bin/pulsar-admin topics create-subscription 
> > persistent://public/default/testCreateSubscriptionCommmand --subscription 
> > subscription-0 failed on 
> > e03e5466c18da298a834ddbeeac9e6c80e583c27b28ecbac0bd472f148696a1b with error 
> > code 1
> > - 
> > org.apache.pulsar.tests.integration.compaction.TestCompaction#testPublishCompactAndConsumeCLI
> > org.testng.internal.thread.ThreadTimeoutException: Method 
> > org.apache.pulsar.tests.integration.compaction.TestCompaction.testPublishCompactAndConsumeCLI()
> >  didn't finish within the time-out 30
> > 
> > I'll run the integration tests locally for 2.7.4 release to see if there's 
> > a regression or whether it's just the tests that are flaky.
> > 
> > 
> > -Lari
> > 
> > 
> > 
> > 
> > 
> > On 2022/02/15 13:54:18 PengHui Li wrote:
> > > Hi Lari
> > > 
> > > The PR you shared https://github.com/apache/pulsar/pull/14240 does not run
> > > any tests.
> > > All the tests were completed in 1 min, please check.
> > > 
> > > Thanks,
> > > Penghui
> > > 
> > > 
> > > 
> > > On Tue, Feb 15, 2022 at 7:23 PM Lari Hotari  wrote:
> > > 
> > > > The tests passed for the a PR based on revision a27e0853bda in 
> > > > branch-2.7
> > > > in this PR that I made to run the tests:
> > > > https://github.com/apache/pulsar/pull/14240 . rev a27e0853bda is the 
> > > > last
> > > > commit before the 2.7.5 release commit in branch-2.7 .
> > > >
> > > > I'll count your vote as +1 since all tests have passed.
> > > >

Re: [VOTE]PIP-189: No batching if only one message in batch.

2022-07-24 Thread mattison chao
+1(non-binding)

Best,
Mattison

On Mon, 25 Jul 2022 at 10:35, Haiting Jiang  wrote:
>
> +1
> Thanks,
> Haiting
>
> On 2022/07/25 02:23:20 Anon Hxy wrote:
> > Dear Community,
> >
> > I would like to start a VOTE on "PIP-189: No batching if only one message
> > in batch."
> >
> > The proposal can be read at [0] and the discussion thread is available at
> > [1] and the PR link is available at [2]
> >
> > Voting will stay open for at least 48h.
> >
> > [0] https://github.com/apache/pulsar/issues/16619
> > [1] https://lists.apache.org/thread/dbq1lrv03bhtk0lr5nwm5txo9ndjplv0
> > [2] https://github.com/apache/pulsar/pull/16605
> >
> > Thanks,
> > Xiaoyu
> >


Re: [VOTE] PIP-184: Topic specific consumer priorityLevel

2022-07-24 Thread guo jiwei
+1

Regards
Jiwei Guo (Tboy)


On Mon, Jul 18, 2022 at 10:54 AM PengHui Li  wrote:

> +1
>
> Penghui
>
> On Mon, Jul 18, 2022 at 10:15 AM Zike Yang  wrote:
>
> > +1
> >
> > Thanks
> >
> >
> > On Fri, Jul 15, 2022 at 3:58 PM Dave Maughan
> >  wrote:
> > >
> > > Hi Pulsar Community
> > >
> > > I would like to start a VOTE on "PIP-184: Topic specific consumer
> > > priorityLevel".
> > >
> > > The proposal can be read at
> > https://github.com/apache/pulsar/issues/16481
> > >
> > > and the discussion thread is available at
> > > https://lists.apache.org/thread/5zs4gd3r0rtzz16nv62o8ntygg01qjhq
> > >
> > > Voting will stay open for at least 48h.
> > >
> > > Thanks,
> > > Dave
> >
>


Re: [VOTE] PIP-187 Add API to analyze a subscription backlog and provide a accurate value

2022-07-24 Thread mattison chao
+1

Best,
Mattison

On Sun, 24 Jul 2022 at 14:51, Haiting Jiang  wrote:
>
> +1
>
> Thanks,
> Haiting
>
> On 2022/07/23 02:00:32 PengHui Li wrote:
> > +1
> >
> > Penghui
> >
> > On Wed, Jul 20, 2022 at 9:41 PM Asaf Mesika  wrote:
> >
> > > Sorry to barge in the vote - I forgot to send my reply on the discussion 2
> > > days ago :)
> > >
> > >
> > > On Tue, Jul 19, 2022 at 11:22 PM Nicolò Boschi 
> > > wrote:
> > >
> > > > +1, thanks
> > > >
> > > > Nicolò Boschi
> > > >
> > > > Il Mar 19 Lug 2022, 22:16 Christophe Bornet  ha
> > > > scritto:
> > > >
> > > > > +1
> > > > >
> > > > > Le mar. 19 juil. 2022 à 20:01, Andrey Yegorov <
> > > > andrey.yego...@datastax.com
> > > > > >
> > > > > a écrit :
> > > > >
> > > > > > +1
> > > > > >
> > > > > > On Tue, Jul 19, 2022 at 10:51 AM Dave Fisher 
> > > wrote:
> > > > > >
> > > > > > > +1 (binding)
> > > > > > >
> > > > > > > I support this enhancement for when a user occasionally requires
> > > > > accurate
> > > > > > > backlog stats. Once we bring this into service we can see if
> > > further
> > > > > > > guardrails are required.
> > > > > > >
> > > > > > > Regards,
> > > > > > > Dave
> > > > > > >
> > > > > > > > On Jul 19, 2022, at 10:02 AM, Enrico Olivelli <
> > > eolive...@gmail.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > This is the VOTE thread for PIP-187
> > > > > > > >
> > > > > > > > This is the GH issue:
> > > > https://github.com/apache/pulsar/issues/16597
> > > > > > > > This is the PR: https://github.com/apache/pulsar/pull/16545
> > > > > > > >
> > > > > > > > The vote is open for at least 48 hours
> > > > > > > >
> > > > > > > > Below you can find a copy of the text of the PIP
> > > > > > > >
> > > > > > > > Best regards
> > > > > > > > Enrico
> > > > > > > >
> > > > > > > >
> > > > > > > > Motivation
> > > > > > > >
> > > > > > > > Currently there is no way to have a accurate backlog for a
> > > > > > subscription:
> > > > > > > >
> > > > > > > > you have only the number of "entries", not messages
> > > > > > > > server side filters (PIP-105) may filter out some messages
> > > > > > > >
> > > > > > > > Having the number of entries is sometimes not enough because 
> > > > > > > > with
> > > > > > > > batch messages the amount of work on the Consumers is
> > > proportional
> > > > to
> > > > > > > > the number of messages, that may vary from entry to entry.
> > > > > > > >
> > > > > > > > Goal
> > > > > > > >
> > > > > > > > The idea of this patch is to provide a dedicate API (REST,
> > > > > > > > pulsar-admin, and Java PulsarAdmin) to "analyze" a subscription
> > > and
> > > > > > > > provide detailed information about that is expected to be
> > > delivered
> > > > > to
> > > > > > > > Consumers.
> > > > > > > >
> > > > > > > > The operation will be quite expensive because we have to load 
> > > > > > > > the
> > > > > > > > messages from storage and pass them to the filters, but due to
> > > the
> > > > > > > > dynamic nature of Pulsar subscriptions there is no other way to
> > > > have
> > > > > > > > this value.
> > > > > > > >
> > > > > > > > One good strategy to do monitoring/alerting is to setup alerts 
> > > > > > > > on
> > > > the
> > > > > > > > usual "stats" and use this new API to inspect the subscription
> > > > > deeper,
> > > > > > > > typically be issuing a manual command.
> > > > > > > >
> > > > > > > > API Changes
> > > > > > > >
> > > > > > > > internal ManagedCursor API:
> > > > > > > >
> > > > > > > > CompletableFuture scan(Predicate condition,
> > > > long
> > > > > > > > maxEntries, long timeOutMs);
> > > > > > > >
> > > > > > > > This method scans the Cursor from the lastMarkDelete position to
> > > > the
> > > > > > > tail.
> > > > > > > > There is a time limit and a maxEntries limit, these are needed 
> > > > > > > > in
> > > > > > > > order to prevent huge (and useless) scans.
> > > > > > > > The Predicate can stop the scan, if it doesn't want to continue
> > > the
> > > > > > > > processing for some reasons.
> > > > > > > >
> > > > > > > > New REST API:
> > > > > > > >
> > > > > > > >@GET
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > > @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/analyzeBacklog
> > > > > > > > Backlog")
> > > > > > > >@ApiOperation(value = "Analyze a subscription, by scanning 
> > > > > > > > all
> > > > the
> > > > > > > > unprocessed messages")
> > > > > > > >
> > > > > > > >public void analyzeBacklog SubscriptionBacklog(
> > > > > > > >   @Suspended final AsyncResponse asyncResponse,
> > > > > > > >@ApiParam(value = "Specify the tenant", required =
> > > true)
> > > > > > > >@PathParam("tenant") String tenant,
> > > > > > > >@ApiParam(value = "Specify the namespace", required =
> > > > > true)
> > > > > > > >@PathParam("namespace") String namespace,
> > > > > > > >@ApiParam(value = "Specify topic name", required =
> > > true)
> > > > > > > >@PathParam("to

Re: [VOTE] PIP-184: Topic specific consumer priorityLevel

2022-07-24 Thread mattison chao
+1(non-binding)

Best,
Mattison

On Mon, 25 Jul 2022 at 13:19, guo jiwei  wrote:
>
> +1
>
> Regards
> Jiwei Guo (Tboy)
>
>
> On Mon, Jul 18, 2022 at 10:54 AM PengHui Li  wrote:
>
> > +1
> >
> > Penghui
> >
> > On Mon, Jul 18, 2022 at 10:15 AM Zike Yang  wrote:
> >
> > > +1
> > >
> > > Thanks
> > >
> > >
> > > On Fri, Jul 15, 2022 at 3:58 PM Dave Maughan
> > >  wrote:
> > > >
> > > > Hi Pulsar Community
> > > >
> > > > I would like to start a VOTE on "PIP-184: Topic specific consumer
> > > > priorityLevel".
> > > >
> > > > The proposal can be read at
> > > https://github.com/apache/pulsar/issues/16481
> > > >
> > > > and the discussion thread is available at
> > > > https://lists.apache.org/thread/5zs4gd3r0rtzz16nv62o8ntygg01qjhq
> > > >
> > > > Voting will stay open for at least 48h.
> > > >
> > > > Thanks,
> > > > Dave
> > >
> >


Disable reconsumerLaster method for non shared subscription

2022-07-24 Thread Gavin gao
As The official doc says "Currently, retry letter topic is enabled in
Shared subscription types". But, actually We can use this method when using
non shared sub.We should disable using this method  for message order
considering.

Pr: https://github.com/apache/pulsar/pull/16745