Hi All, I have used Flink CEP to filter some events and generate some alerts based on certain conditions. But unfortunately doesn't print any result. I have attached source code herewith, could you please help me on this.
-------------------------------- package org.monitoring.stream.analytics; import java.io.IOException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.table.shaded.org.apache.commons.lang3.StringUtils; import org.monitoring.stream.analytics.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.MultiMap; import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.PatternStream; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.windowing.time.Time; public class FlinkCEP { private final static Logger LOGGER = LoggerFactory.getLogger(FlinkCEP.class); public static void main(String[] args) throws Exception { String query = FileHandler.readInputStream(FileHandler.getResourceAsStream("query.sql")); if (query == null) { LOGGER.error("***************** Can't read resources ************************"); } else { LOGGER.info("======================== " + query + " ============================="); } Properties props = FileHandler.loadResourceProperties("application.properties"); Properties kConsumer = FileHandler.loadResourceProperties("consumer.properties"); Properties kProducer = FileHandler.loadResourceProperties("producer.properties"); String hzConfig = FileHandler.readInputStream(FileHandler.getResourceAsStream("hazelcast-client.xml")); String schemaContent = FileHandler.readInputStream(FileHandler.getResourceAsStream("IRIC-schema.json")); props.setProperty("auto.offset.reset", "latest"); props.setProperty("flink.starting-position", "latest"); Map<String, String> tempMap = new HashMap<>(); for (final String name : props.stringPropertyNames()) tempMap.put(name, props.getProperty(name)); final ParameterTool params = ParameterTool.fromMap(tempMap); String jobName = props.getProperty(ApplicationConfig.JOB_NAME); LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility Start %%%%%%%%%%%%%%%%%%%%%%"); LOGGER.info("$$$$$$$$$$$$$$$$$$$$$$$ Hz instance name " + props.toString()); HazelcastInstance hzInst = HazelcastUtils.getClient(hzConfig, ""); LOGGER.info("============================== schema " + schemaContent); MultiMap<String, String> distributedMap = hzInst.getMultiMap("masterDataSynch"); distributedMap.put(jobName, query); LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility End %%%%%%%%%%%%%%%%%%%%%%%%%"); Collection<String> queries = distributedMap.get(jobName); Set<String> rules = new HashSet<>(queries); LOGGER.info("============================== query" + query); rules.add(query); hzInst.getLifecycleService().shutdown(); final String sourceTable = "dataTable"; String paral = props.getProperty(ApplicationConfig.FLINK_PARALLEL_TASK); String noOfOROperatorsValue = props.getProperty(ApplicationConfig.FLINK_NUMBER_OF_OR_OPERATORS); int noOfOROperators = 50; if(StringUtils.isNoneBlank(noOfOROperatorsValue)) { noOfOROperators = Integer.parseInt(noOfOROperatorsValue); } List<List<String>> subQueries = chunk(new ArrayList(rules), noOfOROperators); // define a schema // setup streaming environment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); env.enableCheckpointing(300000); // 300 seconds env.getConfig().setGlobalJobParameters(params); // env.getConfig().enableObjectReuse(); env.getConfig().setUseSnapshotCompression(true); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE); env.setParallelism(Integer.parseInt(paral)); /* env.setStateBackend(new RocksDBStateBackend(env.getStateBackend(), true)); */ FlinkKafkaConsumer010<Event> kafka = new FlinkKafkaConsumer010<>("testin", new EventDeSerializer(), kConsumer); final SourceFunction<Event> source = kafka; DataStream<Event> events = env.addSource(source); DataStream<Event> filteredEvents = events // partition on the address to make sure equal addresses // end up in the same state machine flatMap function .keyBy(Event::getTiggerID); filteredEvents.print(); //create pattern Pattern<Event, ?> pattern = Pattern.<Event>begin("first") .where(new SimpleCondition<Event>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Event event) throws Exception { return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ; //&& event.getLIID().matches(".*193400835.*"); } }) .or(new SimpleCondition<Event>() { private static final long serialVersionUID = 1L; @Override public boolean filter(Event event) throws Exception { return event.getInterceptID().equals(4508724) && event.getProviderInfo().equals("Dhanuka") ; //&& event.getLIID().matches(".*193400835.*"); } }); PatternStream<Event> patternStream = CEP.pattern(filteredEvents, pattern); DataStream<Alert> result = patternStream.select(new PatternSelectFunction<Event, Alert>() { private static final long serialVersionUID = 1L; @Override public Alert select(Map<String, List<Event>> pattern) throws Exception { return parseMatch(pattern); } private Alert parseMatch(Map<String, List<Event>> pattern) { List<Event> events = pattern.get("first"); Alert alert = null; try { alert = new Alert(JSONUtils.writeListToJsonArray(events)); } catch (IOException e) { LOGGER.error(e.getMessage()); alert = new Alert("[]"); } return alert; } }); FlinkKafkaProducer010<Alert> producer = new FlinkKafkaProducer010<>("testout", new AlertSerializer(), kProducer); result.addSink(producer); result.print(); env.execute(props.getProperty(ApplicationConfig.JOB_NAME)); System.out.println("======================= Successfully Deployed ========================"); } public static class MyTime extends ScalarFunction { private static final long serialVersionUID = 1L; public MyTime(long timeMills) { super(); } public String eval(long timeMills) { return new Timestamp(timeMills).toString(); } } public static class Now extends ScalarFunction { private static final long serialVersionUID = 1L; public Now() { super(); } public String eval() { return new Timestamp(System.currentTimeMillis()).toString(); } } public static List<List<String>> chunk(final List<String> arrayList, final int chunkSize) { int rest = arrayList.size() % chunkSize; int noOfChunks = arrayList.size() / chunkSize; int start = 0; int end = 0; int count = (rest == 0 ? noOfChunks: noOfChunks +1); System.out.println("rest " + rest + " noOfChunks " + noOfChunks); List<List<String>> chunks = new ArrayList<>(); for (int index = 0; index < count; ++index) { if (index == 0 && rest > 0) { end = rest; }else { end = start + chunkSize; } List<String> sublist = arrayList.subList(start, end); start = end; chunks.add(sublist); //System.out.println(sublist); } return chunks; } public static String getOnlyConditions(final String sql) { int index = sql.indexOf("WHERE"); return sql.substring(index).replace("WHERE", "(") + ")"; } } Cheers, Dhanuka -- Nothing Impossible,Creativity is more important than knowledge.