Hi Xiaoguang, Thanks for drafting this proposal.
There is a related discussion in the community before, you can get from https://lists.apache.org/list.html?*@pulsar.apache.org:lte=1y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers <https://lists.apache.org/list.html?*@pulsar.apache.org:lte=1y:Proposal%20for%20Consumer%20Filtering%20in%20Pulsar%20brokers> >From the previous discussion, we should make the filter pluggable first at the broker-side if this PIP wants to implement the server-side filter. If the filter implements at the client-side, I think users can filter out the message directly(ack the filter out messages). Thanks, Penghui 孙晓光-社区业务平台 <sunxiaogu...@zhihu.com> 于2021年2月22日周一 下午1:41写道: > Hi guys, I have make some changes to the client side API. In this email > you may find the revised version of Java Client API. > > Motivation > Support filtering messages with flexible expressions. > Both consumer and reader can reset it's position to a specific message id > or publish/event time, therefore can be used for time travel message > processing. However there are scenarios where more sophisticated conditions > can be useful, for example seeking to a particular message with property > set to some specific value. MySQL binlog data or TiDB CDC events are two > examples for these use cases. Applications can embed GTID or TSO into > message property and use it to quickly seek to the most appropriate message > when creating a streaming table from binlog. > Compared to existing ways of seeking by position and timestamp, expression > based seeking can be more flexible and easier to extend in the future to > fulfill more sophisticated application scenarios. Not only is it beneficial > to seeking messages, we could also add broker or client side message > filtering for users with exactly the same feature. Therefore end users can > be relieved from writing their own business logic to only process it's > interested message from a topic. > Public Interfaces > The following changes will be introduced to public interface: > Client API > public interface MessageFilterExpr extends Cloneable { > interface RefBuilder { > OperatorBuilder property(String name); > OperatorBuilder publishTime(); > OperatorBuilder eventTime(); > } > > interface OperatorBuilder { > ExprBuilder equals(String rhs); > ExprBuilder notEquals(String rhs); > ExprBuilder in(String arg1, String arg2, String... others); > ExprBuilder notIn(String arg1, String arg2, String... others); > ExprBuilder greaterThan(String rhs); > ExprBuilder greaterThanEquals(String rhs); > ExprBuilder lessThan(String rhs); > ExprBuilder lessThanEquals(String rhs); > ExprBuilder like(String rhs); > ExprBuilder notLike(String rhs); > ExprBuilder containsAll(String rhs); > ExprBuilder containsAny(String arg1, String arg2, String... > others); > ExprBuilder containsNone(String arg1, String arg2, String... > others); > ExprBuilder between(String arg1, String arg2); > > ExprBuilder equals(Long rhs); > ExprBuilder notEquals(Long rhs); > ExprBuilder in(Long arg1, Long arg2, Long... others); > ExprBuilder notIn(Long arg1, Long arg2, Long... others); > ExprBuilder greaterThan(Long rhs); > ExprBuilder greaterThanEquals(Long rhs); > ExprBuilder lessThan(Long rhs); > ExprBuilder lessThanEquals(Long rhs); > ExprBuilder between(Long arg1, Long arg2); > > ExprBuilder equals(Double rhs); > ExprBuilder notEquals(Double rhs); > ExprBuilder in(Double arg1, Double arg2, Double... others); > ExprBuilder notIn(Double arg1, Double arg2, Double... others); > ExprBuilder greaterThan(Double rhs); > ExprBuilder greaterThanEquals(Double rhs); > ExprBuilder lessThan(Double rhs); > ExprBuilder lessThanEquals(Double rhs); > ExprBuilder between(Double arg1, Double arg2); > } > > interface CombinatorBuilder { > RefBuilder and(); > RefBuilder or(); > ExprBuilder not(); > } > > interface ExprBuilder { > MessageFilterExpr build(); > } > > static RefBuilder builder() { > return DefaultImplementation.newMessageFilterExprRefBuilder(); > } > } > The wire protocol > New message types to protobuf > message Predicate { > enum MessageRef { > PROPERTY = 0; > EVENT_TIME = 1; > PUBLISH_TIME = 2; > } > > enum Operator { > EQUALS = 0; > NOT_EQUALS = 1; > IN = 2; > NOT_IN = 3; > GREATER_THAN = 4; > GREATER_THAN_EQUALS = 5; > LESS_THAN = 6; > LESS_THAN_EQUALS = 7; > LIKE = 8; > NOT_LIKE = 9; > CONTAINS_ALL = 10; > CONTAINS_ANY = 11; > CONTAINS_NONE = 12; > BETWEEN = 13; > } > > enum ValueType { > STRING = 0; > LONG = 1; > DOUBLE = 2; > } > > required MessageRef ref = 1; > optional string name = 2; > required Operator op = 3; > repeated string values = 4; > optional ValueType ty = 5; > } > > message Expr { > enum ExprType { > PREDICATE = 0; > AND = 1; > OR = 2; > NOT = 3; > } > > required ExprType ty = 1; > optional Predicate predicate = 2; > optional Expr lhs = 3; > optional Expr rhs = 4; > } > Changes to existing message type > // Reset an existing consumer to a particular message id > message CommandSeek { > required uint64 consumer_id = 1; > required uint64 request_id = 2; > > optional MessageIdData message_id = 3; > optional uint64 message_publish_time = 4; > optional Expr expression = 5; > } > > Proposed Changes > Add MessageFilterExprBuilder to help users build flexible expressions with > a simple API. Users can specify and combine multiple predicates to > properties of message or other metadata (event time and publish time at > this time). With MessageFilterExpr created per user's request, people can > specify where to start consuming messages with an overloaded seek method > that takes a MessageFilterExpr as argument. > Compatibility, Deprecation, and Migration Plan > Users can only access this feature with a new set of API therefore is > compatible with existing code. The protobuf wire protocol changes are > backward compatible to existing clients as well. > Test Plan > Expression logic is self contained and has little external dependencies > therefore easy to write comprehensive unit tests. The actual consumer and > reader seek logic requires E2E test code to validate however. > > Best regards, > Xiaoguang Sun > > > > On Feb 21, 2021, at 2:37 PM, Jia Zhai <zhai...@apache.org> wrote: > > > > +1. Thanks a lot for this proposal, this feature is very helpful. > > > > On Sun, Feb 21, 2021 at 2:32 PM 孙晓光-社区业务平台 <sunxiaogu...@zhihu.com> > wrote: > > > >> Hi guys, > >> > >> We have application scenarios where more flexible seek capability is > >> required therefore I wrote a PIP draft about it. and looking forward to > >> hear comments, questions and suggestions from the community, Thank you. > >> > >> You may find the proposal right in the email or the original markdown > >> here: > >> > https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md > >> < > >> > https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md > >>> > >> > >> > >> Motivation > >> Support filtering messages with flexible expressions. > >> Both consumer and reader can reset it's position to a specific message > id > >> or publish/event time, therefore can be used for time travel message > >> processing. However there are scenarios where more sophisticated > conditions > >> can be useful, for example seeking to a particular message with property > >> set to some specific value. MySQL binlog data or TiDB CDC events are two > >> examples for these use cases. Applications can embed GTID or TSO into > >> message property and use it to quickly seek to the most appropriate > message > >> when creating a streaming table from binlog. > >> Compared to existing ways of seeking by position and timestamp, > expression > >> based seeking can be more flexible and easier to extend in the future to > >> fulfill more sophisticated application scenarios. Not only is it > beneficial > >> to seeking messages, we could also add broker or client side message > >> filtering for users with exactly the same feature. Therefore end users > can > >> be relieved from writing their own business logic to only process it's > >> interested message from a topic. > >> Public Interfaces > >> The following changes will be introduced to public interface: > >> Client API > >> public interface MessageFilterRef { > >> static MessageFilterRef property(String key); > >> static MessageFilterRef eventTime(); > >> static MessageFilterRef publishTime(); > >> } > >> > >> public enum MessageFilterOperator { > >> EQUALS, > >> NOT_EQUALS, > >> IN, > >> NOT_IN, > >> GREATER_THAN, > >> GREATER_THAN_EQUALS, > >> LESS_THAN, > >> LESS_THAN_EQUALS, > >> STARTS_WITH, > >> STARTS_WITH_IGNORE_CASE, > >> CONTAINS, > >> CONTAINS_IGNORE_CASE, > >> DOES_NOT_CONTAIN, > >> DOES_NOT_CONTAIN_IGNORE_CASE, > >> CONTAINS_ANY, > >> CONTAINS_NONE, > >> } > >> > >> public interface MessageFilterExprBuilder { > >> MessageFilterExprBuilder and(MessageFilterExprBuilder rhs, > >> MessageFilterExprBuilder... others); > >> MessageFilterExprBuilder or(MessageFilterExprBuilder rhs, > >> MessageFilterExprBuilder... others); > >> MessageFilterExprBuilder not(); > >> MessageFilterExpr build(); > >> > >> static MessageFilterExprBuilder of(MessageFilterRef ref, > >> MessageFilterOperator op, String rhs, String... others); > >> static MessageFilterExprBuilder of(MessageFilterRef ref, > >> MessageFilterOperator op, Long rhs, Long... others); > >> static MessageFilterExprBuilder of(MessageFilterRef ref, > >> MessageFilterOperator op, Double rhs, Double... others); > >> > >> static MessageFilterExprBuilder property(String key, > >> MessageFilterOperator op, String rhs, String... others); > >> static MessageFilterExprBuilder property(String key, > >> MessageFilterOperator op, Long rhs, Long... others); > >> static MessageFilterExprBuilder property(String key, > >> MessageFilterOperator op, Double rhs, Double... others); > >> > >> static MessageFilterExprBuilder eventTime(MessageFilterRef ref, > >> MessageFilterOperator op, Long rhs, Long... others); > >> static MessageFilterExprBuilder publishTime(MessageFilterRef ref, > >> MessageFilterOperator op, Long rhs, Long... others); > >> } > >> The wire protocol > >> New message types to protobuf > >> message Predicate { > >> enum MessageRef { > >> PROPERTY = 0; > >> EVENT_TIME = 1; > >> PUBLISH_TIME = 2; > >> } > >> > >> enum Operator { > >> EQUALS = 0; > >> NOT_EQUALS = 1; > >> IN = 2; > >> NOT_IN = 3; > >> GREATER_THAN = 4; > >> GREATER_THAN_EQUALS = 5; > >> LESS_THAN = 6; > >> LESS_THAN_EQUALS = 7; > >> STARTS_WITH = 8; > >> STARTS_WITH_IGNORE_CASE = 9; > >> CONTAINS = 10; > >> CONTAINS_IGNORE_CASE = 11; > >> DOES_NOT_CONTAIN = 12; > >> DOES_NOT_CONTAIN_IGNORE_CASE = 13; > >> CONTAINS_ANY = 14; > >> CONTAINS_NONE = 15; > >> } > >> > >> enum ValueType { > >> STRING = 0; > >> LONG = 1; > >> DOUBLE = 2; > >> } > >> > >> required MessageRef ref = 1; > >> optional string name = 2; > >> required Operator op = 3; > >> repeated string values = 4; > >> optional ValueType ty = 5; > >> } > >> > >> message Expr { > >> enum ExprType { > >> PREDICATE = 0; > >> AND = 1; > >> OR = 2; > >> NOT = 3; > >> } > >> > >> required ExprType tp = 1; > >> optional Predicate predicate = 2; > >> optional Expr op1 = 3; > >> optional Expr op2 = 4; > >> } > >> Changes to existing message type > >> // Reset an existing consumer to a particular message id > >> message CommandSeek { > >> required uint64 consumer_id = 1; > >> required uint64 request_id = 2; > >> > >> optional MessageIdData message_id = 3; > >> optional uint64 message_publish_time = 4; > >> optional Expr expression = 5; > >> } > >> > >> Proposed Changes > >> Add MessageFilterExprBuilder to help users build flexible expressions > with > >> a simple API. Users can specify and combine multiple predicates to > >> properties of message or other metadata (event time and publish time at > >> this time). With MessageFilterExpr created per user's request, people > can > >> specify where to start consuming messages with an overloaded seek method > >> that takes a MessageFilterExpr as argument. > >> Compatibility, Deprecation, and Migration Plan > >> Users can only access this feature with a new set of API therefore is > >> compatible with existing code. The protobuf wire protocol changes are > >> backward compatible to existing clients as well. > >> Test Plan > >> Expression logic is self contained and has little external dependencies > >> therefore easy to write comprehensive unit tests. The actual consumer > and > >> reader seek logic requires E2E test code to validate however. > >> > >> Sincerely, > >> Xiaoguang Sun > >> > >> > >> > >> > >