[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15433850#comment-15433850 ]
Tzu-Li (Gordon) Tai commented on FLINK-4341: -------------------------------------------- I agree. I've been trying around for the past few days, and I think this is the only way to go for now. I'll see if there's a easy way to deactivate this and add {{Long.MAX_VALUE}} back easily in the current code without reverting back. I'm also quite interested in working on the low watermark service for the JobManager. However, since there seems to be some ongoing effort of reworking the JobManager, I wonder if it makes sense to implement this service into the current JobManager now? > Kinesis connector does not emit maximum watermark properly > ---------------------------------------------------------- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.1.0, 1.1.1 > Reporter: Scott Kidder > Assignee: Robert Metzger > Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 2 20.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum > permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider > using a different state backend, like the File System State backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190) > at > org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762) > ... 8 more > {code} > Or: > {code} > 2016-08-09 17:44:43,626 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring > checkpointed state to task Fold: property_id, player -> 10-minute > Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter - > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was > 10891825 bytes. > {code} > This can be fixed by simply submitting the job with a parallelism of 2. I > suspect there was a regression introduced relating to assumptions about the > number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a > value ranging from 1-4). This is currently preventing me from using all > available Task Manager slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)