Thank you for this proposal !

I understand the problem, and I have already thought about it, because
I am the author of some filters (especially the JMS Selectors Filter)
but we have to clarify more about this PIP.

1) It is not clear to me if you want to push all the messages metadata
in the main header of the entry or only the metatadata of the first
entry, or only the KEY.
2) we have to clarify the new message format of the entry and update
the protocol documents
3) existing EntryFilters won't be able to work well with the new
format, we must find a way to make them fail and not process garbage
4) the same problem applies to Interceptors and Protocol Handlers
(like KOP), we must make it clear in this PIP what is the upgrade path
and give some suggestions to the developers of such components

I am supporting this initiative, as far as we clarify those points.

The Pulsar ecosystem is becoming more and more mature, and the user
base is growing quickly, we can't break things without a clear path
for the users and for the developers of extensions

Enrico


Il giorno gio 21 lug 2022 alle ore 12:45 Qiang Huang
<qiang.huang1...@gmail.com> ha scritto:
>
> Good. +1
>
> Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道:
>
> > Hi Pulsar community:
> >
> > I open a pip to discuss "Support batched message using entry filter"
> >
> > Proposal Link: https://github.com/apache/pulsar/issues/16680
> > ---
> >
> > ## Motivation
> >
> > We already have  a plug-in way to filter entries in broker, aka PIP-105
> > https://github.com/apache/pulsar/issues/12269.  But this way only checks
> > at
> > the batch header,  without digging into the individual messages properties.
> > Of course it provides an interface to deserialize the entire Entry to the
> > heap,  But this will bring some memory and cpu workload. And in most
> > scenarios we only need partial properties to do some filter.
> >
> > This proposal brings a method to make PIP-105 support batched entry without
> > having to deserialize the entire Entry to the heap
> >
> >
> > ## API Changes
> >
> > - Add a producer config to specialize the key, of which properties will be
> > added to the batched entry metadata, for example:
> >
> > ```
> >
> > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
> >
> > ```
> > The  `batchedFilterProperties` type is `List<String>` with default value is
> > empty list.  For an empty list, it means that the properties of entry's
> > metadata are empty, and the `EntryFilter` will not take effect.
> >
> > ## Implementation
> >
> > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> >  we extract the message properties and add it to `metadata`:
> > ```
> >  public boolean add(MessageImpl<?> msg, SendCallback callback) {
> >
> >         if (log.isDebugEnabled()) {
> >             log.debug("[{}] [{}] add message to batch, num messages in
> > batch so far {}", topicName, producerName,
> >                     numMessagesInBatch);
> >         }
> >
> >         if (++numMessagesInBatch == 1) {
> >             try {
> >                 // some properties are common amongst the different
> > messages in the batch, hence we just pick it up from
> >                 // the first message
> >                 messageMetadata.setSequenceId(msg.getSequenceId());
> >                 List<KeyValue> filterProperties = getProperties(msg);
> >                 if (!filterProperties.isEmpty()) {
> >                     messageMetadata.addAllProperties(filterProperties);  //
> > and message properties here
> >                 }
> > ```
> >
> > -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> > Messages with same properties can be added to the same batch:
> >
> > ```
> >  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
> >         return batchMessageContainer.haveEnoughSpace(msg)
> >                && (!isMultiSchemaEnabled(false)
> >                     || batchMessageContainer.hasSameSchema(msg)
> >                     || batchMessageContainer.hasSameProperties(msg))
> >                 && batchMessageContainer.hasSameTxn(msg);
> >     }
> >
> > ```
> >
> >
> > ## Reject Alternatives
> >
> > - Implement a `AbstractBatchMessageContainer` ,  saying
> > `BatchMessagePropertiesBasedContainer`, keeping messages with same
> > properties in a single `hashmap` entry,  like
> > `BatchMessageKeyBasedContainer`.
> >
> > Rejection reason:  This will publish messages out of order
> >
> >
> >
> > Thanks,
> > Xiaoyu Hou
> >
>
>
> --
> BR,
> Qiang Huang

Reply via email to