There is an example maybe helpful to understand the properties extraction:
- Let's set `batchedFilterProperties`=`<region, version>`
This means only key named `region` and `version`will be extracted to the
batch meta properties
- Then we have a producer that sends the messges below in order:
- `msg1` with properties: `<region: eu>`
- `msg2` with properties: `<region: eu>`
- `msg3` with properties: `<region: eu, version:1, tag:a>`
- `msg4` with properties: `<region: eu, version:1>`
- `msg5` with properties: `<region: us, version:1>`
- `msg6` with properties: `<region: us, version:2>`
- The process of properties extraction will be:
- msg1 and msg2 have the same properties: <region: eu>, so they will put
into the same batch
- msg3 and msg4 have the same properties: <region: eu, version:1>.
tag:a in msg3 will be ignored because the `batchedFilterProperties`
doesn't contains 'tag'. So msg3 and msg4 will put into the same batch.
- msg5 and msg6 have different properties, because the value of version
is different. So we publish msg5 and msg6 with different batch.
- Just to summarize, the result will be:
| |batch meta properties | single meta properties
| payload | single meta properties | payload |
|------- |---------------------- |------------------------------- |------
|-------------------------|------ |
|batch1 | <region: eu> | <region: eu>
| msg1 | <region: eu> | msg2 |
|batch2 | <region: eu, version:1> | <region: eu, version:1, tag:a> | msg3
| <region: eu, version:1> | msg4 |
|batch3 | <region: us, version:1> | <region: us, version:1> |
msg5 | | |
|batch4 | <region: us, version:2> | <region: us, version:2> |
msg6 | | |
Thanks,
Xiaoyu Hou
Anon Hxy <[email protected]> 于2022年7月19日周二 18:00写道:
> Hi Pulsar community:
>
> I open a pip to discuss "Support batched message using entry filter"
>
> Proposal Link: https://github.com/apache/pulsar/issues/16680
> ---
>
> ## Motivation
>
> We already have a plug-in way to filter entries in broker, aka PIP-105
> https://github.com/apache/pulsar/issues/12269. But this way only checks
> at the batch header, without digging into the individual messages
> properties. Of course it provides an interface to deserialize the entire
> Entry to the heap, But this will bring some memory and cpu workload. And
> in most scenarios we only need partial properties to do some filter.
>
> This proposal brings a method to make PIP-105 support batched entry
> without having to deserialize the entire Entry to the heap
>
>
> ## API Changes
>
> - Add a producer config to specialize the key, of which properties will be
> added to the batched entry metadata, for example:
>
> ```
>
> org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties
>
> ```
> The `batchedFilterProperties` type is `List<String>` with default value
> is empty list. For an empty list, it means that the properties of entry's
> metadata are empty, and the `EntryFilter` will not take effect.
>
> ## Implementation
>
> - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
> we extract the message properties and add it to `metadata`:
> ```
> public boolean add(MessageImpl<?> msg, SendCallback callback) {
>
> if (log.isDebugEnabled()) {
> log.debug("[{}] [{}] add message to batch, num messages in
> batch so far {}", topicName, producerName,
> numMessagesInBatch);
> }
>
> if (++numMessagesInBatch == 1) {
> try {
> // some properties are common amongst the different
> messages in the batch, hence we just pick it up from
> // the first message
> messageMetadata.setSequenceId(msg.getSequenceId());
> List<KeyValue> filterProperties = getProperties(msg);
> if (!filterProperties.isEmpty()) {
> messageMetadata.addAllProperties(filterProperties);
> // and message properties here
> }
> ```
>
> - Also we need to add a method `hasSameProperties` like `hasSameSchema`.
> Messages with same properties can be added to the same batch:
>
> ```
> private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
> return batchMessageContainer.haveEnoughSpace(msg)
> && (!isMultiSchemaEnabled(false)
> || batchMessageContainer.hasSameSchema(msg)
> || batchMessageContainer.hasSameProperties(msg))
> && batchMessageContainer.hasSameTxn(msg);
> }
>
> ```
>
>
> ## Reject Alternatives
>
> - Implement a `AbstractBatchMessageContainer` , saying
> `BatchMessagePropertiesBasedContainer`, keeping messages with same
> properties in a single `hashmap` entry, like
> `BatchMessageKeyBasedContainer`.
>
> Rejection reason: This will publish messages out of order
>
>
>
> Thanks,
> Xiaoyu Hou
>