Hi guys, I have make some changes to the client side API. In this email you may find the revised version of Java Client API.
Motivation Support filtering messages with flexible expressions. Both consumer and reader can reset it's position to a specific message id or publish/event time, therefore can be used for time travel message processing. However there are scenarios where more sophisticated conditions can be useful, for example seeking to a particular message with property set to some specific value. MySQL binlog data or TiDB CDC events are two examples for these use cases. Applications can embed GTID or TSO into message property and use it to quickly seek to the most appropriate message when creating a streaming table from binlog. Compared to existing ways of seeking by position and timestamp, expression based seeking can be more flexible and easier to extend in the future to fulfill more sophisticated application scenarios. Not only is it beneficial to seeking messages, we could also add broker or client side message filtering for users with exactly the same feature. Therefore end users can be relieved from writing their own business logic to only process it's interested message from a topic. Public Interfaces The following changes will be introduced to public interface: Client API public interface MessageFilterExpr extends Cloneable { interface RefBuilder { OperatorBuilder property(String name); OperatorBuilder publishTime(); OperatorBuilder eventTime(); } interface OperatorBuilder { ExprBuilder equals(String rhs); ExprBuilder notEquals(String rhs); ExprBuilder in(String arg1, String arg2, String... others); ExprBuilder notIn(String arg1, String arg2, String... others); ExprBuilder greaterThan(String rhs); ExprBuilder greaterThanEquals(String rhs); ExprBuilder lessThan(String rhs); ExprBuilder lessThanEquals(String rhs); ExprBuilder like(String rhs); ExprBuilder notLike(String rhs); ExprBuilder containsAll(String rhs); ExprBuilder containsAny(String arg1, String arg2, String... others); ExprBuilder containsNone(String arg1, String arg2, String... others); ExprBuilder between(String arg1, String arg2); ExprBuilder equals(Long rhs); ExprBuilder notEquals(Long rhs); ExprBuilder in(Long arg1, Long arg2, Long... others); ExprBuilder notIn(Long arg1, Long arg2, Long... others); ExprBuilder greaterThan(Long rhs); ExprBuilder greaterThanEquals(Long rhs); ExprBuilder lessThan(Long rhs); ExprBuilder lessThanEquals(Long rhs); ExprBuilder between(Long arg1, Long arg2); ExprBuilder equals(Double rhs); ExprBuilder notEquals(Double rhs); ExprBuilder in(Double arg1, Double arg2, Double... others); ExprBuilder notIn(Double arg1, Double arg2, Double... others); ExprBuilder greaterThan(Double rhs); ExprBuilder greaterThanEquals(Double rhs); ExprBuilder lessThan(Double rhs); ExprBuilder lessThanEquals(Double rhs); ExprBuilder between(Double arg1, Double arg2); } interface CombinatorBuilder { RefBuilder and(); RefBuilder or(); ExprBuilder not(); } interface ExprBuilder { MessageFilterExpr build(); } static RefBuilder builder() { return DefaultImplementation.newMessageFilterExprRefBuilder(); } } The wire protocol New message types to protobuf message Predicate { enum MessageRef { PROPERTY = 0; EVENT_TIME = 1; PUBLISH_TIME = 2; } enum Operator { EQUALS = 0; NOT_EQUALS = 1; IN = 2; NOT_IN = 3; GREATER_THAN = 4; GREATER_THAN_EQUALS = 5; LESS_THAN = 6; LESS_THAN_EQUALS = 7; LIKE = 8; NOT_LIKE = 9; CONTAINS_ALL = 10; CONTAINS_ANY = 11; CONTAINS_NONE = 12; BETWEEN = 13; } enum ValueType { STRING = 0; LONG = 1; DOUBLE = 2; } required MessageRef ref = 1; optional string name = 2; required Operator op = 3; repeated string values = 4; optional ValueType ty = 5; } message Expr { enum ExprType { PREDICATE = 0; AND = 1; OR = 2; NOT = 3; } required ExprType ty = 1; optional Predicate predicate = 2; optional Expr lhs = 3; optional Expr rhs = 4; } Changes to existing message type // Reset an existing consumer to a particular message id message CommandSeek { required uint64 consumer_id = 1; required uint64 request_id = 2; optional MessageIdData message_id = 3; optional uint64 message_publish_time = 4; optional Expr expression = 5; } Proposed Changes Add MessageFilterExprBuilder to help users build flexible expressions with a simple API. Users can specify and combine multiple predicates to properties of message or other metadata (event time and publish time at this time). With MessageFilterExpr created per user's request, people can specify where to start consuming messages with an overloaded seek method that takes a MessageFilterExpr as argument. Compatibility, Deprecation, and Migration Plan Users can only access this feature with a new set of API therefore is compatible with existing code. The protobuf wire protocol changes are backward compatible to existing clients as well. Test Plan Expression logic is self contained and has little external dependencies therefore easy to write comprehensive unit tests. The actual consumer and reader seek logic requires E2E test code to validate however. Best regards, Xiaoguang Sun > On Feb 21, 2021, at 2:37 PM, Jia Zhai <zhai...@apache.org> wrote: > > +1. Thanks a lot for this proposal, this feature is very helpful. > > On Sun, Feb 21, 2021 at 2:32 PM 孙晓光-社区业务平台 <sunxiaogu...@zhihu.com> wrote: > >> Hi guys, >> >> We have application scenarios where more flexible seek capability is >> required therefore I wrote a PIP draft about it. and looking forward to >> hear comments, questions and suggestions from the community, Thank you. >> >> You may find the proposal right in the email or the original markdown >> here: >> https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md >> < >> https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md >>> >> >> >> Motivation >> Support filtering messages with flexible expressions. >> Both consumer and reader can reset it's position to a specific message id >> or publish/event time, therefore can be used for time travel message >> processing. However there are scenarios where more sophisticated conditions >> can be useful, for example seeking to a particular message with property >> set to some specific value. MySQL binlog data or TiDB CDC events are two >> examples for these use cases. Applications can embed GTID or TSO into >> message property and use it to quickly seek to the most appropriate message >> when creating a streaming table from binlog. >> Compared to existing ways of seeking by position and timestamp, expression >> based seeking can be more flexible and easier to extend in the future to >> fulfill more sophisticated application scenarios. Not only is it beneficial >> to seeking messages, we could also add broker or client side message >> filtering for users with exactly the same feature. Therefore end users can >> be relieved from writing their own business logic to only process it's >> interested message from a topic. >> Public Interfaces >> The following changes will be introduced to public interface: >> Client API >> public interface MessageFilterRef { >> static MessageFilterRef property(String key); >> static MessageFilterRef eventTime(); >> static MessageFilterRef publishTime(); >> } >> >> public enum MessageFilterOperator { >> EQUALS, >> NOT_EQUALS, >> IN, >> NOT_IN, >> GREATER_THAN, >> GREATER_THAN_EQUALS, >> LESS_THAN, >> LESS_THAN_EQUALS, >> STARTS_WITH, >> STARTS_WITH_IGNORE_CASE, >> CONTAINS, >> CONTAINS_IGNORE_CASE, >> DOES_NOT_CONTAIN, >> DOES_NOT_CONTAIN_IGNORE_CASE, >> CONTAINS_ANY, >> CONTAINS_NONE, >> } >> >> public interface MessageFilterExprBuilder { >> MessageFilterExprBuilder and(MessageFilterExprBuilder rhs, >> MessageFilterExprBuilder... others); >> MessageFilterExprBuilder or(MessageFilterExprBuilder rhs, >> MessageFilterExprBuilder... others); >> MessageFilterExprBuilder not(); >> MessageFilterExpr build(); >> >> static MessageFilterExprBuilder of(MessageFilterRef ref, >> MessageFilterOperator op, String rhs, String... others); >> static MessageFilterExprBuilder of(MessageFilterRef ref, >> MessageFilterOperator op, Long rhs, Long... others); >> static MessageFilterExprBuilder of(MessageFilterRef ref, >> MessageFilterOperator op, Double rhs, Double... others); >> >> static MessageFilterExprBuilder property(String key, >> MessageFilterOperator op, String rhs, String... others); >> static MessageFilterExprBuilder property(String key, >> MessageFilterOperator op, Long rhs, Long... others); >> static MessageFilterExprBuilder property(String key, >> MessageFilterOperator op, Double rhs, Double... others); >> >> static MessageFilterExprBuilder eventTime(MessageFilterRef ref, >> MessageFilterOperator op, Long rhs, Long... others); >> static MessageFilterExprBuilder publishTime(MessageFilterRef ref, >> MessageFilterOperator op, Long rhs, Long... others); >> } >> The wire protocol >> New message types to protobuf >> message Predicate { >> enum MessageRef { >> PROPERTY = 0; >> EVENT_TIME = 1; >> PUBLISH_TIME = 2; >> } >> >> enum Operator { >> EQUALS = 0; >> NOT_EQUALS = 1; >> IN = 2; >> NOT_IN = 3; >> GREATER_THAN = 4; >> GREATER_THAN_EQUALS = 5; >> LESS_THAN = 6; >> LESS_THAN_EQUALS = 7; >> STARTS_WITH = 8; >> STARTS_WITH_IGNORE_CASE = 9; >> CONTAINS = 10; >> CONTAINS_IGNORE_CASE = 11; >> DOES_NOT_CONTAIN = 12; >> DOES_NOT_CONTAIN_IGNORE_CASE = 13; >> CONTAINS_ANY = 14; >> CONTAINS_NONE = 15; >> } >> >> enum ValueType { >> STRING = 0; >> LONG = 1; >> DOUBLE = 2; >> } >> >> required MessageRef ref = 1; >> optional string name = 2; >> required Operator op = 3; >> repeated string values = 4; >> optional ValueType ty = 5; >> } >> >> message Expr { >> enum ExprType { >> PREDICATE = 0; >> AND = 1; >> OR = 2; >> NOT = 3; >> } >> >> required ExprType tp = 1; >> optional Predicate predicate = 2; >> optional Expr op1 = 3; >> optional Expr op2 = 4; >> } >> Changes to existing message type >> // Reset an existing consumer to a particular message id >> message CommandSeek { >> required uint64 consumer_id = 1; >> required uint64 request_id = 2; >> >> optional MessageIdData message_id = 3; >> optional uint64 message_publish_time = 4; >> optional Expr expression = 5; >> } >> >> Proposed Changes >> Add MessageFilterExprBuilder to help users build flexible expressions with >> a simple API. Users can specify and combine multiple predicates to >> properties of message or other metadata (event time and publish time at >> this time). With MessageFilterExpr created per user's request, people can >> specify where to start consuming messages with an overloaded seek method >> that takes a MessageFilterExpr as argument. >> Compatibility, Deprecation, and Migration Plan >> Users can only access this feature with a new set of API therefore is >> compatible with existing code. The protobuf wire protocol changes are >> backward compatible to existing clients as well. >> Test Plan >> Expression logic is self contained and has little external dependencies >> therefore easy to write comprehensive unit tests. The actual consumer and >> reader seek logic requires E2E test code to validate however. >> >> Sincerely, >> Xiaoguang Sun >> >> >> >>