Hello,
         I have defined a pattern (a->b->c->d->e->f->g) that should match 
within 10 minutes. However, when I check the task in the Flink WebUI, I notice 
that the CEP operator quickly reaches a busy state with 100% utilization. 
Increasing the concurrency of CEP operator does not solve the problem, and the 
checkpoint state size keeps growing over time, indicating a potential data 
backlog. I consume data from four Kafka topics simultaneously using the 
setTopics method, and I noticed significant differences in data flow rates 
among the topics. To address this, I increased the watermarks triggering 
interval and set the watermark timestamp to the minimum value among the four 
topics. However, the problem still persists, with the CEP operator remaining 
busy even after several hours.




@Override
public void onEvent(LobbyPathData lobbyPathData, long l, WatermarkOutput 
watermarkOutput) {
    // key from different topic
    String key = lobbyPathData.getProject() + lobbyPathData.getEvent_name();
    if (!maxTimePerTopic.containsKey(key) || l > maxTimePerTopic.get(key)) {
        maxTimePerTopic.put(key, l);
    }
}

@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
    Optional<Long> min = 
maxTimePerTopic.values().stream().min(Comparator.comparingLong(Long::valueOf));
    min.ifPresent(t -> watermarkOutput.emitWatermark(new Watermark(t - 
outOfOrdernessMillis - 1)));
}

Reply via email to