Hi,

I've been only working with flink for the past 2 weeks on a project and am
trying using the CEP library on sensor data. I am using flink version
1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots

What I observe is the following. The input to Kafka is a json string and
when parsed on the flink side, it looks like this

(101,Sun Sep 24 23:18:53 UTC 2017,complex
event,High,37.75142,-122.39458,12.0,20.0)

I use a Tuple8 to capture the parsed data. The first field is home_id. The
time characteristic is set to EventTime and I have an
AscendingTimestampExtractor using the timestamp field. I have parallelism
for the execution environment is set to 4. I have a rather simple event
that I am trying to capture

DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float,
Float>> cepMapByHomeId = cepMap.keyBy(0);

            //cepMapByHomeId.print();

            Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>,
?> cep1 =

Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
                                            .where(new OverLowThreshold())
                                            .followedBy("end")
                                            .where(new OverHighThreshold());


            PatternStream<Tuple8<Integer, Date, String, String, Float,
Float, Float, Float>> patternStream =
CEP.pattern(cepMapByHomeId.keyBy(0), cep1);


            DataStream<Tuple7<Integer, Date, Date, String, String,
Float, Float>> alerts = patternStream.select(new
PackageCapturedEvents());

The pattern checks if the 7th field in the tuple8 goes over 12 and then
over 16. The output of the pattern is like this

(201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex
event,Non-event,37.75837,-122.41467)

On the Kafka producer side, I am trying send simulated data for around 100
homes, so the home_id would go from 0-100 and the input is keyed by
home_id. I have about 10 partitions in kafka. The producer just loops going
through a csv file with a delay of about 100 ms between 2 rows of the csv
file. The data is exactly the same for all 100 of the csv files except for
home_id and the lat & long information. The timestamp is incremented by a
step of 1 sec. I start multiple processes to simulate data form different
homes.

THE PROBLEM:

Flink completely misses capturing events for a large subset of the input
data. I barely see the events for about 4-5 of the home_id values. I do a
print before applying the pattern and after and I see all home_ids before
and only a tiny subset after. Since the data is exactly the same, I expect
all homeid to be captured and written to my sink which is cassandra in this
case. I've looked through all available docs and examples but cannot seem
to get a fix for the problem.

I would really appreciate some guidance how to understand fix this.


Thank you,

Ajay

Reply via email to