Hi All,
I have used Flink CEP to filter some events and generate some alerts
based on certain conditions. But unfortunately doesn't print any
result. I have attached source code herewith, could you please help me
on this.
--------------------------------
package org.monitoring.stream.analytics;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.shaded.org.apache.commons.lang3.StringUtils;
import org.monitoring.stream.analytics.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MultiMap;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
public class FlinkCEP {
private final static Logger LOGGER =
LoggerFactory.getLogger(FlinkCEP.class);
public static void main(String[] args) throws Exception {
String query =
FileHandler.readInputStream(FileHandler.getResourceAsStream("query.sql"));
if (query == null) {
LOGGER.error("***************** Can't read resources
************************");
} else {
LOGGER.info("======================== " + query + "
=============================");
}
Properties props =
FileHandler.loadResourceProperties("application.properties");
Properties kConsumer =
FileHandler.loadResourceProperties("consumer.properties");
Properties kProducer =
FileHandler.loadResourceProperties("producer.properties");
String hzConfig =
FileHandler.readInputStream(FileHandler.getResourceAsStream("hazelcast-client.xml"));
String schemaContent =
FileHandler.readInputStream(FileHandler.getResourceAsStream("IRIC-schema.json"));
props.setProperty("auto.offset.reset", "latest");
props.setProperty("flink.starting-position", "latest");
Map<String, String> tempMap = new HashMap<>();
for (final String name : props.stringPropertyNames())
tempMap.put(name, props.getProperty(name));
final ParameterTool params = ParameterTool.fromMap(tempMap);
String jobName = props.getProperty(ApplicationConfig.JOB_NAME);
LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility
Start %%%%%%%%%%%%%%%%%%%%%%");
LOGGER.info("$$$$$$$$$$$$$$$$$$$$$$$ Hz instance name " +
props.toString());
HazelcastInstance hzInst = HazelcastUtils.getClient(hzConfig, "");
LOGGER.info("============================== schema " + schemaContent);
MultiMap<String, String> distributedMap =
hzInst.getMultiMap("masterDataSynch");
distributedMap.put(jobName, query);
LOGGER.info("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% Desktop Responsibility
End %%%%%%%%%%%%%%%%%%%%%%%%%");
Collection<String> queries = distributedMap.get(jobName);
Set<String> rules = new HashSet<>(queries);
LOGGER.info("============================== query" + query);
rules.add(query);
hzInst.getLifecycleService().shutdown();
final String sourceTable = "dataTable";
String paral = props.getProperty(ApplicationConfig.FLINK_PARALLEL_TASK);
String noOfOROperatorsValue =
props.getProperty(ApplicationConfig.FLINK_NUMBER_OF_OR_OPERATORS);
int noOfOROperators = 50;
if(StringUtils.isNoneBlank(noOfOROperatorsValue)) {
noOfOROperators = Integer.parseInt(noOfOROperatorsValue);
}
List<List<String>> subQueries = chunk(new ArrayList(rules),
noOfOROperators);
// define a schema
// setup streaming environment
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(300000); // 300 seconds
env.getConfig().setGlobalJobParameters(params);
// env.getConfig().enableObjectReuse();
env.getConfig().setUseSnapshotCompression(true);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
env.setParallelism(Integer.parseInt(paral));
/* env.setStateBackend(new RocksDBStateBackend(env.getStateBackend(),
true)); */
FlinkKafkaConsumer010<Event> kafka = new
FlinkKafkaConsumer010<>("testin", new EventDeSerializer(), kConsumer);
final SourceFunction<Event> source = kafka;
DataStream<Event> events = env.addSource(source);
DataStream<Event> filteredEvents = events
// partition on the address to make sure equal addresses
// end up in the same state machine flatMap function
.keyBy(Event::getTiggerID);
filteredEvents.print();
//create pattern
Pattern<Event, ?> pattern = Pattern.<Event>begin("first")
.where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Event event) throws Exception {
return event.getInterceptID().equals(4508724) &&
event.getProviderInfo().equals("Dhanuka") ;
//&& event.getLIID().matches(".*193400835.*");
}
})
.or(new SimpleCondition<Event>() {
private static final long serialVersionUID = 1L;
@Override
public boolean filter(Event event) throws Exception {
return event.getInterceptID().equals(4508724) &&
event.getProviderInfo().equals("Dhanuka") ;
//&& event.getLIID().matches(".*193400835.*");
}
});
PatternStream<Event> patternStream =
CEP.pattern(filteredEvents, pattern);
DataStream<Alert> result = patternStream.select(new
PatternSelectFunction<Event, Alert>() {
private static final long serialVersionUID = 1L;
@Override
public Alert select(Map<String, List<Event>> pattern) throws
Exception {
return parseMatch(pattern);
}
private Alert parseMatch(Map<String, List<Event>> pattern) {
List<Event> events = pattern.get("first");
Alert alert = null;
try {
alert = new Alert(JSONUtils.writeListToJsonArray(events));
} catch (IOException e) {
LOGGER.error(e.getMessage());
alert = new Alert("[]");
}
return alert;
}
});
FlinkKafkaProducer010<Alert> producer = new
FlinkKafkaProducer010<>("testout", new AlertSerializer(), kProducer);
result.addSink(producer);
result.print();
env.execute(props.getProperty(ApplicationConfig.JOB_NAME));
System.out.println("======================= Successfully Deployed
========================");
}
public static class MyTime extends ScalarFunction {
private static final long serialVersionUID = 1L;
public MyTime(long timeMills) {
super();
}
public String eval(long timeMills) {
return new Timestamp(timeMills).toString();
}
}
public static class Now extends ScalarFunction {
private static final long serialVersionUID = 1L;
public Now() {
super();
}
public String eval() {
return new Timestamp(System.currentTimeMillis()).toString();
}
}
public static List<List<String>> chunk(final List<String>
arrayList, final int chunkSize) {
int rest = arrayList.size() % chunkSize;
int noOfChunks = arrayList.size() / chunkSize;
int start = 0;
int end = 0;
int count = (rest == 0 ? noOfChunks: noOfChunks +1);
System.out.println("rest " + rest + " noOfChunks " + noOfChunks);
List<List<String>> chunks = new ArrayList<>();
for (int index = 0; index < count; ++index) {
if (index == 0 && rest > 0) {
end = rest;
}else {
end = start + chunkSize;
}
List<String> sublist = arrayList.subList(start, end);
start = end;
chunks.add(sublist);
//System.out.println(sublist);
}
return chunks;
}
public static String getOnlyConditions(final String sql) {
int index = sql.indexOf("WHERE");
return sql.substring(index).replace("WHERE", "(") + ")";
}
}
Cheers,
Dhanuka
--
Nothing Impossible,Creativity is more important than knowledge.