Hi Pulsar community: I open a pip to discuss "Support batched message using entry filter"
Proposal Link: https://github.com/apache/pulsar/issues/16680 --- ## Motivation We already have a plug-in way to filter entries in broker, aka PIP-105 https://github.com/apache/pulsar/issues/12269. But this way only checks at the batch header, without digging into the individual messages properties. Of course it provides an interface to deserialize the entire Entry to the heap, But this will bring some memory and cpu workload. And in most scenarios we only need partial properties to do some filter. This proposal brings a method to make PIP-105 support batched entry without having to deserialize the entire Entry to the heap ## API Changes - Add a producer config to specialize the key, of which properties will be added to the batched entry metadata, for example: ``` org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties ``` The `batchedFilterProperties` type is `List<String>` with default value is empty list. For an empty list, it means that the properties of entry's metadata are empty, and the `EntryFilter` will not take effect. ## Implementation - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, we extract the message properties and add it to `metadata`: ``` public boolean add(MessageImpl<?> msg, SendCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName, numMessagesInBatch); } if (++numMessagesInBatch == 1) { try { // some properties are common amongst the different messages in the batch, hence we just pick it up from // the first message messageMetadata.setSequenceId(msg.getSequenceId()); List<KeyValue> filterProperties = getProperties(msg); if (!filterProperties.isEmpty()) { messageMetadata.addAllProperties(filterProperties); // and message properties here } ``` - Also we need to add a method `hasSameProperties` like `hasSameSchema`. Messages with same properties can be added to the same batch: ``` private boolean canAddToCurrentBatch(MessageImpl<?> msg) { return batchMessageContainer.haveEnoughSpace(msg) && (!isMultiSchemaEnabled(false) || batchMessageContainer.hasSameSchema(msg) || batchMessageContainer.hasSameProperties(msg)) && batchMessageContainer.hasSameTxn(msg); } ``` ## Reject Alternatives - Implement a `AbstractBatchMessageContainer` , saying `BatchMessagePropertiesBasedContainer`, keeping messages with same properties in a single `hashmap` entry, like `BatchMessageKeyBasedContainer`. Rejection reason: This will publish messages out of order Thanks, Xiaoyu Hou