[ 
https://issues.apache.org/jira/browse/FLINK-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198175#comment-16198175
 ] 

Ajay edited comment on FLINK-7782 at 10/10/17 5:15 AM:
-------------------------------------------------------

Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. I have implemented an ascending 
timestamp extractor. However, I do see some issues with state. Here is a quick 
snapshot of the size of state from the Flink job.

End to End Duration     State Size      Buffered During Alignment
Minimum 9ms     260 KB  0 B
Average 57ms    988 KB  2.26 KB
Maximum 3s      15.3 MB 271 KB


ID      Status  Acknowledged    Trigger Time    Latest Acknowledgement  Failure 
Time    End to End Duration     State Size      Buffered During Alignment       
Failure Message
514     Failed  11/12 (92%)     22:15:12        22:15:13        22:15:15        
2s      4.11 MB 0 B     Checkpoint failed: Checkpoint Coordinator is suspending.



I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.



was (Author: ajkrishna):
Hi Kostas,

My stream is pretty much unbounded in an endless loop and the pattern repeats, 
so there is a continuous stream of data. Also, I was monitoring my Kafka 
manager and the Flink streaming dashboard. Kafka shows almost no lag and I see 
watermarks advancing in Flink. I have implemented an ascending timestamp 
extractor. Here is quick snapshot of the size of state from the Flink job.

End to End Duration     State Size      Buffered During Alignment
Minimum 9ms     260 KB  0 B
Average 18ms    481 KB  1 B
Maximum 1s      709 KB  437 B

I have tried the same job with a parallelism of 1 and while there is 
significant lag in Kafka, the behavior from the CEP library is exactly the same.


> Flink CEP not recognizing pattern
> ---------------------------------
>
>                 Key: FLINK-7782
>                 URL: https://issues.apache.org/jira/browse/FLINK-7782
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Ajay
>
> I am using flink version 1.3.2. Flink has a kafka source. I am using 
> KafkaSource9. I am running Flink on a 3 node AWS cluster with 8G of RAM 
> running Ubuntu 16.04. From the flink dashboard, I see that I have 2 
> Taskmanagers & 4 Task slots
> What I observe is the following. The input to Kafka is a json string and when 
> parsed on the flink side, it looks like this
> {code:java}
> (101,Sun Sep 24 23:18:53 UTC 2017,complex 
> event,High,37.75142,-122.39458,12.0,20.0)
> {code}
> I use a Tuple8 to capture the parsed data. The first field is home_id. The 
> time characteristic is set to EventTime and I have an 
> AscendingTimestampExtractor using the timestamp field. I have parallelism for 
> the execution environment is set to 4. I have a rather simple event that I am 
> trying to capture
> {code:java}
> DataStream<Tuple8<Integer,Date,String,String,Float,Float,Float, Float>> 
> cepMapByHomeId = cepMap.keyBy(0);
>             //cepMapByHomeId.print();
>             
> Pattern<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>, ?> cep1 =
>                             
> Pattern.<Tuple8<Integer,Date,String,String,Float,Float,Float,Float>>begin("start")
>                                             .where(new OverLowThreshold())
>                                             .followedBy("end")
>                                             .where(new OverHighThreshold());
>             PatternStream<Tuple8<Integer, Date, String, String, Float, Float, 
> Float, Float>> patternStream = CEP.pattern(cepMapByHomeId, cep1);
>             DataStream<Tuple7<Integer, Date, Date, String, String, Float, 
> Float>> alerts = patternStream.select(new PackageCapturedEvents());
> {code}
> The pattern checks if the 7th field in the tuple8 goes over 12 and then over 
> 16. The output of the pattern is like this
> {code:java}
> (201,Tue Sep 26 14:56:09 UTC 2017,Tue Sep 26 15:11:59 UTC 2017,complex 
> event,Non-event,37.75837,-122.41467)
> {code}
> On the Kafka producer side, I am trying send simulated data for around 100 
> homes, so the home_id would go from 0-100 and the input is keyed by home_id. 
> I have about 10 partitions in kafka. The producer just loops going through a 
> csv file with a delay of about 100 ms between 2 rows of the csv file. The 
> data is exactly the same for all 100 of the csv files except for home_id and 
> the lat & long information. The timestamp is incremented by a step of 1 sec. 
> I start multiple processes to simulate data form different homes.
> THE PROBLEM:
> Flink completely misses capturing events for a large subset of the input 
> data. I barely see the events for about 4-5 of the home_id values. I do a 
> print before applying the pattern and after and I see all home_ids before and 
> only a tiny subset after. Since the data is exactly the same, I expect all 
> homeid to be captured and written to my sink.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to