Hi guys,

We have application scenarios where more flexible seek capability is required 
therefore I wrote a PIP draft about it. and looking forward to hear comments, 
questions and suggestions from the community, Thank you.

You may find the proposal right in the email or the original markdown here: 
https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md
 
<https://github.com/sunxiaoguang/junkyard/blob/master/Message-filtering-expression.md>

  
Motivation
Support filtering messages with flexible expressions.
Both consumer and reader can reset it's position to a specific message id or 
publish/event time, therefore can be used for time travel message processing. 
However there are scenarios where more sophisticated conditions can be useful, 
for example seeking to a particular message with property set to some specific 
value. MySQL binlog data or TiDB CDC events are two examples for these use 
cases. Applications can embed GTID or TSO into message property and use it to 
quickly seek to the most appropriate message when creating a streaming table 
from binlog.
Compared to existing ways of seeking by position and timestamp, expression 
based seeking can be more flexible and easier to extend in the future to 
fulfill more sophisticated application scenarios. Not only is it beneficial to 
seeking messages, we could also add broker or client side message filtering for 
users with exactly the same feature. Therefore end users can be relieved from 
writing their own business logic to only process it's interested message from a 
topic.
Public Interfaces
The following changes will be introduced to public interface:
Client API
public interface MessageFilterRef {
    static MessageFilterRef property(String key);
    static MessageFilterRef eventTime();
    static MessageFilterRef publishTime();
}

public enum MessageFilterOperator {
    EQUALS,
    NOT_EQUALS,
    IN,
    NOT_IN,
    GREATER_THAN,
    GREATER_THAN_EQUALS,
    LESS_THAN,
    LESS_THAN_EQUALS,
    STARTS_WITH,
    STARTS_WITH_IGNORE_CASE,
    CONTAINS,
    CONTAINS_IGNORE_CASE,
    DOES_NOT_CONTAIN, 
    DOES_NOT_CONTAIN_IGNORE_CASE, 
    CONTAINS_ANY,
    CONTAINS_NONE,
}

public interface MessageFilterExprBuilder {
    MessageFilterExprBuilder and(MessageFilterExprBuilder rhs, 
MessageFilterExprBuilder... others);
    MessageFilterExprBuilder or(MessageFilterExprBuilder rhs, 
MessageFilterExprBuilder... others);
    MessageFilterExprBuilder not();
    MessageFilterExpr build();
    
    static MessageFilterExprBuilder of(MessageFilterRef ref, 
MessageFilterOperator op, String rhs, String... others);
    static MessageFilterExprBuilder of(MessageFilterRef ref, 
MessageFilterOperator op, Long rhs, Long... others);
    static MessageFilterExprBuilder of(MessageFilterRef ref, 
MessageFilterOperator op, Double rhs, Double... others);
  
    static MessageFilterExprBuilder property(String key, MessageFilterOperator 
op, String rhs, String... others);
    static MessageFilterExprBuilder property(String key, MessageFilterOperator 
op, Long rhs, Long... others);
    static MessageFilterExprBuilder property(String key, MessageFilterOperator 
op, Double rhs, Double... others);
  
    static MessageFilterExprBuilder eventTime(MessageFilterRef ref, 
MessageFilterOperator op, Long rhs, Long... others);
    static MessageFilterExprBuilder publishTime(MessageFilterRef ref, 
MessageFilterOperator op, Long rhs, Long... others);
}
The wire protocol
New message types to protobuf
message Predicate {
    enum MessageRef {
        PROPERTY = 0;
        EVENT_TIME = 1;
        PUBLISH_TIME = 2;
    }

    enum Operator {
        EQUALS = 0;
        NOT_EQUALS = 1;
        IN = 2;
        NOT_IN = 3;
        GREATER_THAN = 4;
        GREATER_THAN_EQUALS = 5;
        LESS_THAN = 6;
        LESS_THAN_EQUALS = 7;
        STARTS_WITH = 8;
        STARTS_WITH_IGNORE_CASE = 9;
        CONTAINS = 10;
        CONTAINS_IGNORE_CASE = 11;
        DOES_NOT_CONTAIN = 12;
        DOES_NOT_CONTAIN_IGNORE_CASE = 13;
        CONTAINS_ANY = 14;
        CONTAINS_NONE = 15;
    }
    
    enum ValueType {
        STRING = 0;
        LONG = 1;
        DOUBLE = 2;
    }

    required MessageRef ref = 1;
    optional string name = 2;
    required Operator op = 3;
    repeated string values = 4;
    optional ValueType ty = 5;
}

message Expr {
    enum ExprType {
        PREDICATE = 0;
        AND = 1;
        OR = 2;
        NOT = 3;
    }

    required ExprType tp = 1;
    optional Predicate predicate = 2;
    optional Expr op1 = 3;
    optional Expr op2 = 4;
}
Changes to existing message type
// Reset an existing consumer to a particular message id
message CommandSeek {
    required uint64 consumer_id = 1;
    required uint64 request_id  = 2;

    optional MessageIdData message_id = 3;
    optional uint64 message_publish_time = 4;
    optional Expr expression = 5;
}

Proposed Changes
Add MessageFilterExprBuilder to help users build flexible expressions with a 
simple API. Users can specify and combine multiple predicates to properties of 
message or other metadata (event time and publish time at this time). With 
MessageFilterExpr created per user's request, people can specify where to start 
consuming messages with an overloaded seek method that takes a 
MessageFilterExpr as argument.
Compatibility, Deprecation, and Migration Plan
Users can only access this feature with a new set of API therefore is 
compatible with existing code. The protobuf wire protocol changes are backward 
compatible to existing clients as well.
Test Plan
Expression logic is self contained and has little external dependencies 
therefore easy to write comprehensive unit tests. The actual consumer and 
reader seek logic requires E2E test code to validate however.

Sincerely,
Xiaoguang Sun



Reply via email to