Hi Sridhar,
From looking at your code:
1) The “KafkaDataSource” is a custom source that you implemented? Does this
source buffer anything?
2) The getStreamSource2 seems to return again a "new
KafkaDataSource<MyMessage1>”. Can this be a problem?
3) You are working on processing time and you are simply detecting if 2
messages of the same type came within 15min right?
I suppose that this could also be implemented using the times()
quantifier, but this is just a matter of taste.
Could you reduce this to a smaller duration and see if you still get a
corrupted stream exception?
Thanks,
Kostas
> On Sep 27, 2017, at 5:42 AM, Sridhar Chellappa <[email protected]> wrote:
>
> One more point to add.
>
> I disabled checkpoints (by commenting out code that calls
> enableCheckpointing()) and re-ran the job this time with plenty of memory to
> the job manager
>
> ~/flink-1.3.2/bin/yarn-session.sh -n 4 -jm 24576 -tm 24576 -s 2 -d
>
> At the Jobmanager, I am still hitting:
>
> 2017-09-25 06:46:44,066 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - Starting
> YARN ApplicationMaster / ResourceManager / JobManager (Version: 1.3.2,
> Rev:0399bee, Date:03.08.2017 @ 10:23:11 UTC)
> 2017-09-25 06:46:44,066 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - Current
> user: flink
> 2017-09-25 06:46:44,066 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - JVM: OpenJDK
> 64-Bit Server VM - Oracle Corporation - 1.8/25.131-b11
> 2017-09-25 06:46:44,066 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - Maximum heap
> size: 16384 MiBytes
> 2017-09-25 06:46:44,066 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - JAVA_HOME:
> /usr/lib/jvm/java-8-openjdk-amd64
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - Hadoop
> version: 2.7.2
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - JVM Options:
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - -Xmx18432m
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner -
> -Dlog.file=/var/log/hadoop-yarn/userlogs/application_1506317793012_0001/container_1506317793012_0001_01_000001/jobmanager.log
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner -
> -Dlogback.configurationFile=file:logback.xml
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner -
> -Dlog4j.configuration=file:log4j.properties
> 2017-09-25 06:46:44,067 INFO
> org.apache.flink.yarn.YarnApplicationMasterRunner - Program
> Arguments: (none)
>
>
> .
>
> .
>
> 2017-09-25 06:50:51,925 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6)
> switched from DEPLOYING to RUNNING.
> 2017-09-25 13:38:54,175 INFO org.apache.flink.runtime.blob.BlobCache
> - Created BLOB cache storage directory
> /tmp/blobStore-3e0b96a1-904b-4acb-b0d3-9d88f2073e97
> 2017-09-25 13:38:54,187 INFO org.apache.flink.runtime.blob.BlobCache
> - Downloading 49efe0ad58b727ba145b86df6088111c9a90ddd6 from
> localhost/127.0.0.1:55550 <http://127.0.0.1:55550/>
> 2017-09-25 16:30:39,974 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph -
> KeyedCEPPatternOperator -> Map (2/2) (e464ec796cd239a7b7fa225aaf86309a)
> switched from RUNNING to CANCELED.
> 2017-09-25 16:30:39,975 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
> Custom Source -> (Filter -> Map -> Map, Filter -> Map -> Map, Filter -> Map
> -> Map, Map -> (Filter, Filter)) (2/2) (e27860984c858738f044931e4b6a86a6)
> switched from RUNNING to FAILED.
> java.lang.OutOfMemoryError: GC overhead limit exceeded
> at sun.reflect.GeneratedConstructorAccessor10.newInstance(Unknown
> Source)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at
> com.twitter.chill.Instantiators$$anonfun$normalJava$1.apply(KryoBase.scala:160)
> at
> com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:123)
> at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.createCopy(FieldSerializer.java:620)
> at
> com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:624)
> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
> at
> org.apache.flink.api.java.typeutils.runtime.PojoSerializer.copy(PojoSerializer.java:239)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:526)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>
>
> On Wed, Sep 27, 2017 at 8:34 AM, Sridhar Chellappa <[email protected]
> <mailto:[email protected]>> wrote:
> Here is the snippet :
>
> public interface Rule {
> DataStream<Alert> run();
> }
>
> public class Rule1 implements Rule {
>
> private static final String RULE_ID = "Rule1"
>
> @Override
> public DataStream<Alert> run() {
>
>
> Pattern<MyMessage1, ?> MyMessage1Pattern =
> Pattern.<MyMessage1>begin("first").
> subtype(MyMessage1.class).
> next("second").
> subtype(MyMessage1.class).
> within(Time.minutes(15);
>
> PatternStream<MyMessage1> MyMessage1PatternStream =
> CEP.pattern(
> MyMessage1DataStream.keyBy("field1", "field2"),
> MyMessage1Pattern
> );
>
> return (MyMessage1PatternStream.select(
> new PatternSelectFunction<MyMessage1, Alert>() {
> @Override
> public Alert select(Map<String, List<MyMessage1>>
> pattern) throws Exception {
>
> String alertMessage = String.format("Cep Alert. Rule
> ID : %s" RULE_ID);
>
> return new CEPAlert(alertMessage);
> }
> }
> )
> );
>
> }
>
>
>
> private static List<Rule> getStream1RulesToExecute(DataStream<MyMessage1>
> MyMessage1DataStream) {
> List<Rule> rules = new ArrayList<Rule>();
>
> rules.add(new Rule1(MyMessage1DataStream));
>
> return rules;
> }
>
>
> private static List<Rule> getStream2RulesToExecute(DataStream<MyMessage1>
> MyMessage1DataStream) {
> List<Rule> rules = new ArrayList<Rule>();
>
> rules.add(new Rule2(MyMessage1DataStream));
> return rules;
> }
>
>
> public RichParallelSourceFunction<MyMessage1>
> getStreamSource1(StreamExecutionEnvironment env, ParameterTool parameterTool)
> {
>
>
> env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
> DEFAULT_CHECKPOINT_INTERVAL));
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>
> env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>
> KafkaDataSource<T> flinkCepConsumer =
> new KafkaDataSource<MyMessage1>(parameterTool, new
> MyMessage1SerDeSchema());
>
> return flinkCepConsumer;
> }
>
>
> public RichParallelSourceFunction<MyMessage2>
> getStreamSource2(StreamExecutionEnvironment env, ParameterTool parameterTool)
> {
>
>
> env.enableCheckpointing(parameterTool.getInt(CHECKPOINT_INTERVAL,
> DEFAULT_CHECKPOINT_INTERVAL));
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS);
>
> env.getCheckpointConfig().setCheckpointTimeout(CheckpointConfig.DEFAULT_TIMEOUT);
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
>
> KafkaDataSource<T> flinkCepConsumer =
> new KafkaDataSource<MyMessage1>(parameterTool, new
> MyMessage2SerDeSchema());
>
> return flinkCepConsumer;
> }
>
>
> public static void main(String[] args) throws Exception {
> ParameterTool parameterTool =
> ParameterTool.fromPropertiesFile(args[PROPS_FILE_ARG_INDEX]);
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig().setGlobalJobParameters(parameterTool);
>
> DataStream<MyMessage1> message1Stream = env.addSource(
> getStreamSource1(env, parameterTool);
> );
>
>
> DataStream<MyMessge2> message2Stream = env.addSource(
> getStreamSource2(env, parameterTool);
> );
>
>
> getStream1RulesToExecute(message1Stream).forEach(rule ->
> rule.run().print());
> getStream2RulesToExecute(message2tream).forEach(rule ->
> rule.run().print());
> env.execute(STREAMING_JOB_NAME);
> }
>
>
>
>
>
>
> On Mon, Sep 25, 2017 at 3:13 PM, Tzu-Li (Gordon) Tai <[email protected]
> <mailto:[email protected]>> wrote:
> I talked a bit with Kostas on what may be happening here.
>
> It could be that your patterns are not closing, which depends on the pattern
> construction of your CEP job.
> Could you perhaps provide an overview / code snippet of what your CEP job is
> doing?
>
> Looping Kostas (in CC) also to this thread as he may have a better idea what
> is happening here.
>
> Cheers,
> Gordon
>
> On 22 September 2017 at 4:09:07 PM, Sridhar Chellappa ([email protected]
> <mailto:[email protected]>) wrote:
>
>> Thanks for the reply. Well, tracing back to the root cause, I see the
>> following:
>>
>> 1. At the Job manager, the Checkpoint times are getting worse :
>>
>> Jobmanager :
>>
>> Checkpoint times are getting worse progressively.
>>
>> 2017-09-16 05:05:50,813 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 1 @ 1505538350809
>> 2017-09-16 05:05:51,396 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 1 (11101233 bytes in 586 ms).
>> 2017-09-16 05:07:30,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 2 @ 1505538450809
>> 2017-09-16 05:07:31,657 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 2 (18070955 bytes in 583 ms).
>>
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> .
>> 2017-09-16 07:32:58,117 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 89 (246125113 bytes in 27194 ms).
>> 2017-09-16 07:34:10,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 90 @ 1505547250809
>> 2017-09-16 07:34:44,932 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 90 (248272325 bytes in 34012 ms).
>> 2017-09-16 07:35:50,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 91 @ 1505547350809
>> 2017-09-16 07:36:37,058 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 91 (250348812 bytes in 46136 ms).
>> 2017-09-16 07:37:30,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 92 @ 1505547450809
>> 2017-09-16 07:38:18,076 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 92 (252399724 bytes in 47152 ms).
>> 2017-09-16 07:39:10,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 93 @ 1505547550809
>> 2017-09-16 07:40:13,494 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 93 (254374636 bytes in 62573 ms).
>> 2017-09-16 07:40:50,809 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 94 @ 1505547650809
>> 2017-09-16 07:42:42,850 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 94 (256386533 bytes in 111898 ms).
>> 2017-09-16 07:42:42,850 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 95 @ 1505547762850
>> 2017-09-16 07:46:06,241 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>> checkpoint 95 (258441766 bytes in 203268 ms).
>> 2017-09-16 07:46:06,241 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>> checkpoint 96 @ 1505547966241
>> 2017-09-16 07:48:42,069 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph -
>> KeyedCEPPatternOperator -> Map (1/4) (ff835faa9eb9182ed2f2230a1e5cc56d)
>> switched from RUNNING to FAILED.
>> AsynchronousException{java.lang.Exception: Could not materialize checkpoint
>> 96 for operator KeyedCEPPatternOperator -> Map (1/4).}
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.Exception: Could not materialize checkpoint 96 for
>> operator KeyedCEPPatternOperator -> Map (1/4).
>> ... 6 more
>> Caused by: java.util.concurrent.ExecutionException:
>> java.lang.OutOfMemoryError: GC overhead limit exceeded
>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>> at
>> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>> ... 5 more
>>
>>
>> So, it looks like the Job Manager ran out of memory, thanks to the
>> "Progressively Getting Worse" checkpoints. Any ideas on how to make sure the
>> checkpoints faster?
>>
>>
>>
>>
>>
>>
>> On Thu, Sep 21, 2017 at 7:29 PM, Tzu-Li (Gordon) Tai <[email protected]
>> <mailto:[email protected]>> wrote:
>> Hi Sridhar,
>>
>> Sorry that this didn't get a response earlier.
>>
>> According to the trace, it seems like the job failed during the process, and
>> when trying to automatically restore from a checkpoint, deserialization of a
>> CEP `IterativeCondition` object failed. As far as I can tell, CEP operators
>> are just using Java serialization on CEP `IterativeCondition` objects, so
>> should not be related to the protobuf serializer that you are using.
>>
>> Is this still constantly happening for you?
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
>>
>
>