Good. +1 Anon Hxy <anonhx...@gmail.com> 于2022年7月19日周二 18:00写道:
> Hi Pulsar community: > > I open a pip to discuss "Support batched message using entry filter" > > Proposal Link: https://github.com/apache/pulsar/issues/16680 > --- > > ## Motivation > > We already have a plug-in way to filter entries in broker, aka PIP-105 > https://github.com/apache/pulsar/issues/12269. But this way only checks > at > the batch header, without digging into the individual messages properties. > Of course it provides an interface to deserialize the entire Entry to the > heap, But this will bring some memory and cpu workload. And in most > scenarios we only need partial properties to do some filter. > > This proposal brings a method to make PIP-105 support batched entry without > having to deserialize the entire Entry to the heap > > > ## API Changes > > - Add a producer config to specialize the key, of which properties will be > added to the batched entry metadata, for example: > > ``` > > org.apache.pulsar.client.impl.conf.ProducerConfigurationData#batchedFilterProperties > > ``` > The `batchedFilterProperties` type is `List<String>` with default value is > empty list. For an empty list, it means that the properties of entry's > metadata are empty, and the `EntryFilter` will not take effect. > > ## Implementation > > - When call `org.apache.pulsar.client.impl.BatchMessageContainerImpl#add`, > we extract the message properties and add it to `metadata`: > ``` > public boolean add(MessageImpl<?> msg, SendCallback callback) { > > if (log.isDebugEnabled()) { > log.debug("[{}] [{}] add message to batch, num messages in > batch so far {}", topicName, producerName, > numMessagesInBatch); > } > > if (++numMessagesInBatch == 1) { > try { > // some properties are common amongst the different > messages in the batch, hence we just pick it up from > // the first message > messageMetadata.setSequenceId(msg.getSequenceId()); > List<KeyValue> filterProperties = getProperties(msg); > if (!filterProperties.isEmpty()) { > messageMetadata.addAllProperties(filterProperties); // > and message properties here > } > ``` > > - Also we need to add a method `hasSameProperties` like `hasSameSchema`. > Messages with same properties can be added to the same batch: > > ``` > private boolean canAddToCurrentBatch(MessageImpl<?> msg) { > return batchMessageContainer.haveEnoughSpace(msg) > && (!isMultiSchemaEnabled(false) > || batchMessageContainer.hasSameSchema(msg) > || batchMessageContainer.hasSameProperties(msg)) > && batchMessageContainer.hasSameTxn(msg); > } > > ``` > > > ## Reject Alternatives > > - Implement a `AbstractBatchMessageContainer` , saying > `BatchMessagePropertiesBasedContainer`, keeping messages with same > properties in a single `hashmap` entry, like > `BatchMessageKeyBasedContainer`. > > Rejection reason: This will publish messages out of order > > > > Thanks, > Xiaoyu Hou > -- BR, Qiang Huang