Hello, I have defined a pattern (a->b->c->d->e->f->g) that should match within 10 minutes. However, when I check the task in the Flink WebUI, I notice that the CEP operator quickly reaches a busy state with 100% utilization. Increasing the concurrency of CEP operator does not solve the problem, and the checkpoint state size keeps growing over time, indicating a potential data backlog. I consume data from four Kafka topics simultaneously using the setTopics method, and I noticed significant differences in data flow rates among the topics. To address this, I increased the watermarks triggering interval and set the watermark timestamp to the minimum value among the four topics. However, the problem still persists, with the CEP operator remaining busy even after several hours.
@Override public void onEvent(LobbyPathData lobbyPathData, long l, WatermarkOutput watermarkOutput) { // key from different topic String key = lobbyPathData.getProject() + lobbyPathData.getEvent_name(); if (!maxTimePerTopic.containsKey(key) || l > maxTimePerTopic.get(key)) { maxTimePerTopic.put(key, l); } } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { Optional<Long> min = maxTimePerTopic.values().stream().min(Comparator.comparingLong(Long::valueOf)); min.ifPresent(t -> watermarkOutput.emitWatermark(new Watermark(t - outOfOrdernessMillis - 1))); }