Hi Kostas,

Thank you for reaching out and for the suggestions. Here are the results

1. Using an env parallelism of 1 performed similar with the additional
problem that there was significant lag in the kafka topic
2. I removed the additional keyBy(0) but that did not change anything
3. I also tried only to check for the start only pattern and it was exactly
the same where I saw one of the homes going through but 3 others just
getting dropped.
4. I also tried slowing down the rate from 5000/second into Kafka to about
1000/second but I see similar results.

I was wondering if you had any other solutions to the problem. I am
specially concerned about 1 and 3. Is this library under active development
? Is there a JIRA open on this issue and could be open one to track this ?


I was trying read on Stackoverlfow and found a user had a very very similar
issue in Aug'16. So I also contacted him to discuss the issue and learn't
that the pattern of failure was exactly the same.

https://stackoverflow.com/questions/38870819/flink-cep-is-not-deterministic


Before I found the above post, I created a post for this issue
https://stackoverflow.com/questions/46458873/flink-cep-not-recognizing-pattern



I would really appreciate your guidance on this.

Best regards,
Ajay





On Thu, Sep 28, 2017 at 1:38 AM, Kostas Kloudas <k.klou...@data-artisans.com
> wrote:

> Hi Ajay,
>
> I will look a bit more on the issue.
>
> But in the meantime, could you run your job with parallelism of 1, to see
> if the results are the expected?
>
> Also could you change the pattern, for example check only for the start,
> to see if all keys pass through.
>
> As for the code, you apply keyBy(0) the cepMap stream twice, which is
> redundant and introduces latency.
> You could remove that to also see the impact.
>
> Kostas
>
> On Sep 28, 2017, at 2:57 AM, Ajay Krishna <ajaykris...@gmail.com> wrote:
>
> Hi,
>
> I've been only working with flink for the past 2 weeks on a project and am
> trying using the CEP library on sensor data. I am using flink version
> 1.3.2. Flink has a kafka source. I am using KafkaSource9. I am running
> Flink on a 3 node AWS cluster with 8G of RAM running Ubuntu 16.04. From the
> flink dashboard, I see that I have 2 Taskmanagers & 4 Task slots
>
> What I observe is the following. The input to Kafka is a json string and
> when parsed on the flink side, it looks like this
>
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
>
> I use a Tuple8 to capture the parsed data. The first field is home_id. The
> time characteristic is set to EventTime and I have an
> AscendingTimestampExtractor using the timestamp field. I have parallelism
> for the execution environment is set to 4. I have a rather simple event
> that I am trying to capture
>
> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> 
> cepMapByHomeId = cepMap.keyBy(0);
>
>             //cepMapByHomeId.print();
>
>             
> Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>                             
> Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>                                             .where(new OverLowThreshold())
>                                             .followedBy("end")
>                                             .where(new OverHighThreshold());
>
>
>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, 
> Float, Float>> patternStream = CEP.pattern(cepMapByHomeId.keyBy(0), cep1);
>
>
>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, 
> Float>> alerts = patternStream.select(new PackageCapturedEvents());
>
> The pattern checks if the 7th field in the tuple8 goes over 12 and then
> over 16. The output of the pattern is like this
>
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
>
> On the Kafka producer side, I am trying send simulated data for around 100
> homes, so the home_id would go from 0-100 and the input is keyed by
> home_id. I have about 10 partitions in kafka. The producer just loops going
> through a csv file with a delay of about 100 ms between 2 rows of the csv
> file. The data is exactly the same for all 100 of the csv files except for
> home_id and the lat & long information. The timestamp is incremented by a
> step of 1 sec. I start multiple processes to simulate data form different
> homes.
>
> THE PROBLEM:
>
> Flink completely misses capturing events for a large subset of the input
> data. I barely see the events for about 4-5 of the home_id values. I do a
> print before applying the pattern and after and I see all home_ids before
> and only a tiny subset after. Since the data is exactly the same, I expect
> all homeid to be captured and written to my sink which is cassandra in this
> case. I've looked through all available docs and examples but cannot seem
> to get a fix for the problem.
>
> I would really appreciate some guidance how to understand fix this.
>
>
> Thank you,
>
> Ajay
>
>
>

Reply via email to