Hi Kostas, Thank you for reaching out and for the suggestions. Here are the results
1. Using an env parallelism of 1 performed similar with the additional problem that there was significant lag in the kafka topic 2. I removed the additional keyBy(0) but that did not change anything 3. I also tried only to check for the start only pattern and it was exactly the same where I saw one of the homes going through but 3 others just getting dropped. 4. I also tried slowing down the rate from 5000/second into Kafka to about 1000/second but I see similar results. I was wondering if you had any other solutions to the problem. I am specially concerned about 1 and 3. Is this library under active development ? Is there a JIRA open on this issue and could be open one to track this ? I was trying read on Stackoverlfow and found a user had a very very similar issue in Aug'16. So I also contacted him to discuss the issue and learn't that the pattern of failure was exactly the same. https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic Before I found the above post, I created a post for this issue https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern I would really appreciate your guidance on this. Best regards, Ajay On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.klou...@data-artisans.com > wrote: > Hi Ajay, > > I will look a bit more on the issue. > > But in the meantime, could you run your job with parallelism of 1, to see > if the results are the expected? > > Also could you change the pattern, for example check only for the start, > to see if all keys pass through. > > As for the code, you apply keyBy(0) the cepMap stream twice, which is > redundant and introduces latency. > You could remove that to also see the impact. > > Kostas > > On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykris...@gmail.com> wrote: > > Hi, > > I've been only working with flink for the past 2 weeks on a project and am > trying using the CEP library on sensor data. I am using flink version > 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running > Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the > flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots > > What I observe is the following. The input to Kafka is a json string and > when parsed on the flink side, it looks like this > > (101,Sun Sep 24 23:18:53 UTC 2017,complex > event,High,37.75142,-122.39458,12.0,20.0) > > I use a Tuple8 to capture the parsed data. The first field is home_id. The > time characteristic is set to EventTime and I have an > AscendingTimestampExtractor using the timestamp field. I have parallelism > for the execution environment is set to 4. I have a rather simple event > that I am trying to capture > > DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> > cepMapByHomeId = cepMap.keyBy(0); > > //cepMapByHomeId.print(); > > > Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 = > > Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start") > .where(new OverLowThreshold()) > .followedBy("end") > .where(new OverHighThreshold()); > > > PatternStream<Tuple8<Integer, Date, String, String, Float, Float, > Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1); > > > DataStream<Tuple7<Integer, Date, Date, String, String, Float, > Float>> alerts = patternStream.select(new PackageCapturedEvents()); > > The pattern checks if the 7th field in the tuple8 goes over 12 and then > over 16. The output of the pattern is like this > > (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex > event,Non-event,37.75837,-122.41467) > > On the Kafka producer side, I am trying send simulated data for around 100 > homes, so the home_id would go from 0-100 and the input is keyed by > home_id. I have about 10 partitions in kafka. The producer just loops going > through a csv file with a delay of about 100 ms between 2 rows of the csv > file. The data is exactly the same for all 100 of the csv files except for > home_id and the lat & long information. The timestamp is incremented by a > step of 1 sec. I start multiple processes to simulate data form different > homes. > > THE PROBLEM: > > Flink completely misses capturing events for a large subset of the input > data. I barely see the events for about 4-5 of the home_id values. I do a > print before applying the pattern and after and I see all home_ids before > and only a tiny subset after. Since the data is exactly the same, I expect > all homeid to be captured and written to my sink which is cassandra in this > case. I've looked through all available docs and examples but cannot seem > to get a fix for the problem. > > I would really appreciate some guidance how to understand fix this. > > > Thank you, > > Ajay > > >