+1. Thanks a lot for this proposal, this feature is very helpful.

On Sun, Feb 21, 2021 at 2:32 PM 孙晓光-社区业务平台 <sunxiaogu...@zhihu.com> wrote:

> Hi guys,
>
> We have application scenarios where more flexible seek capability is
> required therefore I wrote a PIP draft about it. and looking forward to
> hear comments, questions and suggestions from the community, Thank you.
>
> You may find the proposal right in the email or the original markdown
> here:
> https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md
> <
> https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md
> >
>
>
> Motivation
> Support filtering messages with flexible expressions.
> Both consumer and reader can reset it's position to a specific message id
> or publish/event time, therefore can be used for time travel message
> processing. However there are scenarios where more sophisticated conditions
> can be useful, for example seeking to a particular message with property
> set to some specific value. MySQL binlog data or TiDB CDC events are two
> examples for these use cases. Applications can embed GTID or TSO into
> message property and use it to quickly seek to the most appropriate message
> when creating a streaming table from binlog.
> Compared to existing ways of seeking by position and timestamp, expression
> based seeking can be more flexible and easier to extend in the future to
> fulfill more sophisticated application scenarios. Not only is it beneficial
> to seeking messages, we could also add broker or client side message
> filtering for users with exactly the same feature. Therefore end users can
> be relieved from writing their own business logic to only process it's
> interested message from a topic.
> Public Interfaces
> The following changes will be introduced to public interface:
> Client API
> public interface MessageFilterRef {
>     static MessageFilterRef property(String key);
>     static MessageFilterRef eventTime();
>     static MessageFilterRef publishTime();
> }
>
> public enum MessageFilterOperator {
>     EQUALS,
>     NOT_EQUALS,
>     IN,
>     NOT_IN,
>     GREATER_THAN,
>     GREATER_THAN_EQUALS,
>     LESS_THAN,
>     LESS_THAN_EQUALS,
>     STARTS_WITH,
>     STARTS_WITH_IGNORE_CASE,
>     CONTAINS,
>     CONTAINS_IGNORE_CASE,
>     DOES_NOT_CONTAIN,
>     DOES_NOT_CONTAIN_IGNORE_CASE,
>     CONTAINS_ANY,
>     CONTAINS_NONE,
> }
>
> public interface MessageFilterExprBuilder {
>     MessageFilterExprBuilder and(MessageFilterExprBuilder rhs,
> MessageFilterExprBuilder... others);
>     MessageFilterExprBuilder or(MessageFilterExprBuilder rhs,
> MessageFilterExprBuilder... others);
>     MessageFilterExprBuilder not();
>     MessageFilterExpr build();
>
>     static MessageFilterExprBuilder of(MessageFilterRef ref,
> MessageFilterOperator op, String rhs, String... others);
>     static MessageFilterExprBuilder of(MessageFilterRef ref,
> MessageFilterOperator op, Long rhs, Long... others);
>     static MessageFilterExprBuilder of(MessageFilterRef ref,
> MessageFilterOperator op, Double rhs, Double... others);
>
>     static MessageFilterExprBuilder property(String key,
> MessageFilterOperator op, String rhs, String... others);
>     static MessageFilterExprBuilder property(String key,
> MessageFilterOperator op, Long rhs, Long... others);
>     static MessageFilterExprBuilder property(String key,
> MessageFilterOperator op, Double rhs, Double... others);
>
>     static MessageFilterExprBuilder eventTime(MessageFilterRef ref,
> MessageFilterOperator op, Long rhs, Long... others);
>     static MessageFilterExprBuilder publishTime(MessageFilterRef ref,
> MessageFilterOperator op, Long rhs, Long... others);
> }
> The wire protocol
> New message types to protobuf
> message Predicate {
>     enum MessageRef {
>         PROPERTY = 0;
>         EVENT_TIME = 1;
>         PUBLISH_TIME = 2;
>     }
>
>     enum Operator {
>         EQUALS = 0;
>         NOT_EQUALS = 1;
>         IN = 2;
>         NOT_IN = 3;
>         GREATER_THAN = 4;
>         GREATER_THAN_EQUALS = 5;
>         LESS_THAN = 6;
>         LESS_THAN_EQUALS = 7;
>         STARTS_WITH = 8;
>         STARTS_WITH_IGNORE_CASE = 9;
>         CONTAINS = 10;
>         CONTAINS_IGNORE_CASE = 11;
>         DOES_NOT_CONTAIN = 12;
>         DOES_NOT_CONTAIN_IGNORE_CASE = 13;
>         CONTAINS_ANY = 14;
>         CONTAINS_NONE = 15;
>     }
>
>     enum ValueType {
>         STRING = 0;
>         LONG = 1;
>         DOUBLE = 2;
>     }
>
>     required MessageRef ref = 1;
>     optional string name = 2;
>     required Operator op = 3;
>     repeated string values = 4;
>     optional ValueType ty = 5;
> }
>
> message Expr {
>     enum ExprType {
>         PREDICATE = 0;
>         AND = 1;
>         OR = 2;
>         NOT = 3;
>     }
>
>     required ExprType tp = 1;
>     optional Predicate predicate = 2;
>     optional Expr op1 = 3;
>     optional Expr op2 = 4;
> }
> Changes to existing message type
> // Reset an existing consumer to a particular message id
> message CommandSeek {
>     required uint64 consumer_id = 1;
>     required uint64 request_id  = 2;
>
>     optional MessageIdData message_id = 3;
>     optional uint64 message_publish_time = 4;
>     optional Expr expression = 5;
> }
>
> Proposed Changes
> Add MessageFilterExprBuilder to help users build flexible expressions with
> a simple API. Users can specify and combine multiple predicates to
> properties of message or other metadata (event time and publish time at
> this time). With MessageFilterExpr created per user's request, people can
> specify where to start consuming messages with an overloaded seek method
> that takes a MessageFilterExpr as argument.
> Compatibility, Deprecation, and Migration Plan
> Users can only access this feature with a new set of API therefore is
> compatible with existing code. The protobuf wire protocol changes are
> backward compatible to existing clients as well.
> Test Plan
> Expression logic is self contained and has little external dependencies
> therefore easy to write comprehensive unit tests. The actual consumer and
> reader seek logic requires E2E test code to validate however.
>
> Sincerely,
> Xiaoguang Sun
>
>
>
>

Reply via email to