Hi Pulsar community:

I open a pip to discuss "Support batched message using entry filter"

Proposal Link: https://github.com/apache/pulsar/issues/16680
---

## Motivation

We already have  a plug-in way to filter entries in broker, aka PIP-105
https://github.com/apache/pulsar/issues/12269.  But this way only checks at
the batch header,  without digging into the individual messages properties.
Of course it provides an interface to deserialize the entire Entry to the
heap,  But this will bring some memory and cpu workload. And in most
scenarios we only need partial properties to do some filter.

This proposal brings a method to make PIP-105 support batched entry without
having to deserialize the entire Entry to the heap


## API Changes

- Add a producer config to specialize the key, of which properties will be
added to the batched entry metadata, for example:

```
org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties

```
The  `batchedFilterProperties` type is `List<String>` with default value is
empty list.  For an empty list, it means that the properties of entry's
metadata are empty, and the `EntryFilter` will not take effect.

## Implementation

- When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`,
 we extract the message properties and add it to `metadata`:
```
 public boolean add(MessageImpl<?> msg, SendCallback callback) {

        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] add message to batch, num messages in
batch so far {}", topicName, producerName,
                    numMessagesInBatch);
        }

        if (++numMessagesInBatch == 1) {
            try {
                // some properties are common amongst the different
messages in the batch, hence we just pick it up from
                // the first message
                messageMetadata.setSequenceId(msg.getSequenceId());
                List<KeyValue> filterProperties = getProperties(msg);
                if (!filterProperties.isEmpty()) {
                    messageMetadata.addAllProperties(filterProperties);  //
and message properties here
                }
```

-  Also we need to add a method `hasSameProperties` like `hasSameSchema`.
Messages with same properties can be added to the same batch:

```
 private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
        return batchMessageContainer.haveEnoughSpace(msg)
               && (!isMultiSchemaEnabled(false)
                    || batchMessageContainer.hasSameSchema(msg)
                    || batchMessageContainer.hasSameProperties(msg))
                && batchMessageContainer.hasSameTxn(msg);
    }

```


## Reject Alternatives

- Implement a `AbstractBatchMessageContainer` ,  saying
`BatchMessagePropertiesBasedContainer`, keeping messages with same
properties in a single `hashmap` entry,  like
`BatchMessageKeyBasedContainer`.

Rejection reason:  This will publish messages out of order



Thanks,
Xiaoyu Hou

Reply via email to