There is an example maybe helpful to understand the properties extraction: - Let's set `batchedFilterProperties`=`<region, version>` This means only key named `region` and `version`will be extracted to the batch meta properties
- Then we have a producer that sends the messges below in order: - `msg1` with properties: `<region: eu>` - `msg2` with properties: `<region: eu>` - `msg3` with properties: `<region: eu, version:1, tag:a>` - `msg4` with properties: `<region: eu, version:1>` - `msg5` with properties: `<region: us, version:1>` - `msg6` with properties: `<region: us, version:2>` - The process of properties extraction will be: - msg1 and msg2 have the same properties: <region: eu>, so they will put into the same batch - msg3 and msg4 have the same properties: <region: eu, version:1>. tag:a in msg3 will be ignored because the `batchedFilterProperties` doesn't contains 'tag'. So msg3 and msg4 will put into the same batch. - msg5 and msg6 have different properties, because the value of version is different. So we publish msg5 and msg6 with different batch. - Just to summarize, the result will be: | |batch meta properties | single meta properties | payload | single meta properties | payload | |------- |---------------------- |------------------------------- |------ |-------------------------|------ | |batch1 | <region: eu> | <region: eu> | msg1 | <region: eu> | msg2 | |batch2 | <region: eu, version:1> | <region: eu, version:1, tag:a> | msg3 | <region: eu, version:1> | msg4 | |batch3 | <region: us, version:1> | <region: us, version:1> | msg5 | | | |batch4 | <region: us, version:2> | <region: us, version:2> | msg6 | | | Thanks, Xiaoyu Hou Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道: > Hi Pulsar community: > > I open a pip to discuss "Support batched message using entry filter" > > Proposal Link: https://github.com/apache/pulsar/issues/16680 > --- > > ## Motivation > > We already have a plug-in way to filter entries in broker, aka PIP-105 > https://github.com/apache/pulsar/issues/12269. But this way only checks > at the batch header, without digging into the individual messages > properties. Of course it provides an interface to deserialize the entire > Entry to the heap, But this will bring some memory and cpu workload. And > in most scenarios we only need partial properties to do some filter. > > This proposal brings a method to make PIP-105 support batched entry > without having to deserialize the entire Entry to the heap > > > ## API Changes > > - Add a producer config to specialize the key, of which properties will be > added to the batched entry metadata, for example: > > ``` > > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties > > ``` > The `batchedFilterProperties` type is `List<String>` with default value > is empty list. For an empty list, it means that the properties of entry's > metadata are empty, and the `EntryFilter` will not take effect. > > ## Implementation > > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, > we extract the message properties and add it to `metadata`: > ``` > public boolean add(MessageImpl<?> msg, SendCallback callback) { > > if (log.isDebugEnabled()) { > log.debug("[{}] [{}] add message to batch, num messages in > batch so far {}", topicName, producerName, > numMessagesInBatch); > } > > if (++numMessagesInBatch == 1) { > try { > // some properties are common amongst the different > messages in the batch, hence we just pick it up from > // the first message > messageMetadata.setSequenceId(msg.getSequenceId()); > List<KeyValue> filterProperties = getProperties(msg); > if (!filterProperties.isEmpty()) { > messageMetadata.addAllProperties(filterProperties); > // and message properties here > } > ``` > > - Also we need to add a method `hasSameProperties` like `hasSameSchema`. > Messages with same properties can be added to the same batch: > > ``` > private boolean canAddToCurrentBatch(MessageImpl<?> msg) { > return batchMessageContainer.haveEnoughSpace(msg) > && (!isMultiSchemaEnabled(false) > || batchMessageContainer.hasSameSchema(msg) > || batchMessageContainer.hasSameProperties(msg)) > && batchMessageContainer.hasSameTxn(msg); > } > > ``` > > > ## Reject Alternatives > > - Implement a `AbstractBatchMessageContainer` , saying > `BatchMessagePropertiesBasedContainer`, keeping messages with same > properties in a single `hashmap` entry, like > `BatchMessageKeyBasedContainer`. > > Rejection reason: This will publish messages out of order > > > > Thanks, > Xiaoyu Hou >