xingyuan cheng created FLINK-38566:
--------------------------------------

             Summary: Support CEP DSL
                 Key: FLINK-38566
                 URL: https://issues.apache.org/jira/browse/FLINK-38566
             Project: Flink
          Issue Type: New Feature
          Components: Library / CEP
            Reporter: xingyuan cheng


In IoT and crowd selection scenarios, complex CEP expressions are difficult for 
users to flexibly configure. Therefore, a simplified set of conditional 
expressions is needed to express CEP operators. This feature change aims to 
provide a convenient way to implement flexible and configurable expressions by 
configuring conditional expressions. I translated the syntax parsing logic from 
DSL to CEP operators using ALT4 syntax.
 
*Migration from Pattern API*

*Before (Pattern API)*

{code:java}
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getValue() > 100;
        }
    })
    .next("middle")
    .where(new SimpleCondition<Event>() {
        @Override
        public boolean filter(Event event) {
            return event.getValue() < 50;
        }
    }); {code}


*After (DSL)*
{code:java}
PatternStream<Event> pattern = DslCompiler.compile(
    "start(value > 100) middle(value < 50)",
    dataStream
); {code}
 
 
The following is a simple demonstration use case:


 
{code:java}
package org.apache.flink.cep.dsl;


import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.dsl.api.DslCompiler;
import org.apache.flink.cep.dsl.api.EventAdapter;
import org.apache.flink.cep.dsl.util.MapEventAdapter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import java.util.HashMap;
import java.util.List;
import java.util.Map;


/**
 * Example usage of the CEP DSL.
 *
 * <p>This class demonstrates various ways to use the DSL with different event 
types and patterns.
 * These are examples only and not executable tests.
 */
public class DslExampleUsage {


    // Example 1: Simple POJO events with basic pattern
    public static void simplePojoExample() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        // Create sample data
        DataStream<SensorReading> sensorData =
                env.fromElements(
                        new SensorReading("sensor1", 95.0, 
System.currentTimeMillis()),
                        new SensorReading("sensor1", 105.0, 
System.currentTimeMillis()),
                        new SensorReading("sensor1", 110.0, 
System.currentTimeMillis()));


        // Define pattern using DSL
        PatternStream<SensorReading> pattern =
                DslCompiler.compile("HighTemp(temperature > 100)", sensorData);


        // Process matches
        pattern.select(
                        match -> {
                            SensorReading reading = 
match.get("HighTemp").get(0);
                            return String.format(
                                    "High temperature alert: Sensor %s at 
%.1f°C",
                                    reading.id, reading.temperature);
                        })
                .print();


        env.execute("Simple POJO Example");
    }


    // Example 2: Event correlation
    public static void eventCorrelationExample() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<SensorReading> sensorData =
                env.fromElements(
                        new SensorReading("sensor1", 95.0, 1000L),
                        new SensorReading("sensor1", 105.0, 2000L),
                        new SensorReading("sensor2", 110.0, 3000L));


        // Pattern with event correlation
        String dsl = "Start(id = 'sensor1' and temperature > 90) -> " +
                    "End(id = Start.id and temperature > Start.temperature)";


        PatternStream<SensorReading> pattern = DslCompiler.compile(dsl, 
sensorData);


        pattern.select(
                        match -> {
                            SensorReading start = match.get("Start").get(0);
                            SensorReading end = match.get("End").get(0);
                            return String.format(
                                    "Temperature rise detected: %.1f -> %.1f",
                                    start.temperature, end.temperature);
                        })
                .print();


        env.execute("Event Correlation Example");
    }


    // Example 3: Map-based events
    public static void mapEventExample() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        // Create Map events
        Map<String, Object> event1 = new HashMap<>();
        event1.put("_eventType", "Alert");
        event1.put("severity", 7);
        event1.put("message", "High CPU usage");


        Map<String, Object> event2 = new HashMap<>();
        event2.put("_eventType", "Alert");
        event2.put("severity", 9);
        event2.put("message", "Critical error");


        DataStream<Map<String, Object>> alerts = env.fromElements(event1, 
event2);


        // Use MapEventAdapter
        PatternStream<Map<String, Object>> pattern =
                DslCompiler.compile(
                        "Alert(severity > 5)", alerts, new MapEventAdapter());


        pattern.select(match -> 
match.get("Alert").get(0).get("message")).print();


        env.execute("Map Event Example");
    }


    // Example 4: Complex pattern with quantifiers and time window
    public static void complexPatternExample() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<UserEvent> userEvents =
                env.fromElements(
                        new UserEvent("user1", "login", 1000L),
                        new UserEvent("user1", "browse", 2000L),
                        new UserEvent("user1", "browse", 3000L),
                        new UserEvent("user1", "purchase", 4000L));


        // Complex pattern: login -> multiple browses -> purchase, within 30 
seconds
        String dsl =
                "%SKIP_TO_LAST['Login'] "
                        + "Login(action = 'login') -> "
                        + "Browse{1,5}(action = 'browse' and userId = 
Login.userId) -> "
                        + "Purchase(action = 'purchase' and userId = 
Login.userId) "
                        + "within 30s";


        PatternStream<UserEvent> pattern = DslCompiler.compile(dsl, userEvents);


        pattern.select(
                        match -> {
                            UserEvent login = match.get("Login").get(0);
                            List<UserEvent> browses = match.get("Browse");
                            UserEvent purchase = match.get("Purchase").get(0);
                            return String.format(
                                    "User %s: login -> %d browses -> purchase",
                                    login.userId, browses.size());
                        })
                .print();


        env.execute("Complex Pattern Example");
    }


    // Example 5: Builder API with custom adapter
    public static void builderApiExample() throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();


        DataStream<CustomEvent> events = env.fromElements(new CustomEvent());


        // Custom event adapter
        EventAdapter<CustomEvent> customAdapter =
                new EventAdapter<CustomEvent>() {
                    @Override
                    public java.util.Optional<Object> getAttribute(
                            CustomEvent event, String attributeName) {
                        return 
java.util.Optional.ofNullable(event.getField(attributeName));
                    }


                    @Override
                    public String getEventType(CustomEvent event) {
                        return event.getTypeName();
                    }
                };


        // Use builder API
        PatternStream<CustomEvent> pattern =
                DslCompiler.<CustomEvent>builder()
                        .withStrictTypeMatching()
                        .withEventAdapter(customAdapter)
                        .compile("MyEvent(value > 100)", events);


        pattern.select(match -> "Matched: " + 
match.get("MyEvent").get(0)).print();


        env.execute("Builder API Example");
    }


    // Example event classes


    /** Simple sensor reading POJO. */
    public static class SensorReading {
        public String id;
        public double temperature;
        public long timestamp;


        public SensorReading(String id, double temperature, long timestamp) {
            this.id = id;
            this.temperature = temperature;
            this.timestamp = timestamp;
        }


        public String getId() {
            return id;
        }


        public double getTemperature() {
            return temperature;
        }


        public long getTimestamp() {
            return timestamp;
        }
    }


    /** User event POJO. */
    public static class UserEvent {
        public String userId;
        public String action;
        public long timestamp;


        public UserEvent(String userId, String action, long timestamp) {
            this.userId = userId;
            this.action = action;
            this.timestamp = timestamp;
        }


        public String getUserId() {
            return userId;
        }


        public String getAction() {
            return action;
        }


        public long getTimestamp() {
            return timestamp;
        }
    }


    /** Custom event type. */
    public static class CustomEvent {
        public Object getField(String name) {
            return null; // Implementation omitted
        }


        public String getTypeName() {
            return "MyEvent";
        }
    }
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to