There is an example maybe helpful to understand the properties extraction:

- Let's set `batchedFilterProperties`=`<region, version>`
This means only key named `region` and `version`will be extracted to the
batch meta properties

- Then we have a  producer that sends the messges below in order:
    - `msg1` with properties: `<region: eu>`
    - `msg2` with properties: `<region: eu>`
    - `msg3` with properties: `<region: eu, version:1, tag:a>`
    - `msg4` with properties: `<region: eu, version:1>`
    - `msg5` with properties: `<region: us, version:1>`
    - `msg6` with properties: `<region: us, version:2>`

- The process of properties extraction will be:
   - msg1 and msg2 have the same properties: <region: eu>, so they will put
into the same batch
   - msg3 and msg4 have the same properties: <region: eu, version:1>.
 tag:a in msg3 will be ignored because the `batchedFilterProperties`
doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
    - msg5 and msg6 have different properties, because the value of version
is different. So we publish msg5 and msg6 with different batch.

- Just to summarize, the result will be:

|             |batch meta properties | single meta properties
| payload | single meta properties   |  payload  |
|------- |---------------------- |------------------------------- |------
  |-------------------------|------ |
|batch1  | <region: eu>                 |  <region: eu>
        | msg1     |  <region: eu>                   |   msg2 |
|batch2 | <region: eu, version:1>  | <region: eu, version:1, tag:a> | msg3
    |  <region: eu, version:1> |   msg4 |
|batch3 | <region: us, version:1> | <region: us, version:1>             |
msg5     |                                         |               |
|batch4 | <region: us, version:2> | <region: us, version:2>            |
msg6     |                                         |              |

Thanks,
Xiaoyu Hou

Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道:

> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have  a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269.  But this way only checks
> at the batch header,  without digging into the individual messages
> properties. Of course it provides an interface to deserialize the entire
> Entry to the heap,  But this will bring some memory and cpu workload. And
> in most scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry
> without having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The  `batchedFilterProperties` type is `List<String>` with default value
> is empty list.  For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
>  we extract the message properties and add it to `metadata`:
> ```
>  public boolean add(MessageImpl<?> msg, SendCallback callback) {
>
>         if (log.isDebugEnabled()) {
>             log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
>                     numMessagesInBatch);
>         }
>
>         if (++numMessagesInBatch == 1) {
>             try {
>                 // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
>                 // the first message
>                 messageMetadata.setSequenceId(msg.getSequenceId());
>                 List<KeyValue> filterProperties = getProperties(msg);
>                 if (!filterProperties.isEmpty()) {
>                     messageMetadata.addAllProperties(filterProperties);
>  // and message properties here
>                 }
> ```
>
> -  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
>  private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
>         return batchMessageContainer.haveEnoughSpace(msg)
>                && (!isMultiSchemaEnabled(false)
>                     || batchMessageContainer.hasSameSchema(msg)
>                     || batchMessageContainer.hasSameProperties(msg))
>                 && batchMessageContainer.hasSameTxn(msg);
>     }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` ,  saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry,  like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason:  This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>

Reply via email to