[ https://issues.apache.org/jira/browse/FLINK-4341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15440877#comment-15440877 ]
ASF GitHub Bot commented on FLINK-4341: --------------------------------------- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/2414 Minus @rmetzger's comment this looks good to merge! Thanks for fixing this @tzulitai! > Kinesis connector does not emit maximum watermark properly > ---------------------------------------------------------- > > Key: FLINK-4341 > URL: https://issues.apache.org/jira/browse/FLINK-4341 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors > Affects Versions: 1.1.0, 1.1.1 > Reporter: Scott Kidder > Assignee: Robert Metzger > Priority: Blocker > Fix For: 1.2.0, 1.1.2 > > > **Prevously reported as "Checkpoint state size grows unbounded when task > parallelism not uniform"** > This issue was first encountered with Flink release 1.1.0 (commit 45f7825). I > was previously using a 1.1.0 snapshot (commit 18995c8) which performed as > expected. This issue was introduced somewhere between those commits. > I've got a Flink application that uses the Kinesis Stream Consumer to read > from a Kinesis stream with 2 shards. I've got 2 task managers with 2 slots > each, providing a total of 4 slots. When running the application with a > parallelism of 4, the Kinesis consumer uses 2 slots (one per Kinesis shard) > and 4 slots for subsequent tasks that process the Kinesis stream data. I use > an in-memory store for checkpoint data. > Yesterday I upgraded to Flink 1.1.0 (45f7825) and noticed that checkpoint > states were growing unbounded when running with a parallelism of 4, > checkpoint interval of 10 seconds: > {code} > ID State Size > 1 11.3 MB > 2 20.9 MB > 3 30.6 MB > 4 41.4 MB > 5 52.6 MB > 6 62.5 MB > 7 71.5 MB > 8 83.3 MB > 9 93.5 MB > {code} > The first 4 checkpoints generally succeed, but then fail with an exception > like the following: > {code} > java.lang.RuntimeException: Error triggering a checkpoint as the result of > receiving checkpoint barrier at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:768) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:758) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:203) > at > org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:129) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:183) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:66) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:266) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: Size of the state is larger than the maximum > permitted memory-backed state. Size=12105407 , maxSize=5242880 . Consider > using a different state backend, like the File System State backend. > at > org.apache.flink.runtime.state.memory.MemoryStateBackend.checkSize(MemoryStateBackend.java:146) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetBytes(MemoryStateBackend.java:200) > at > org.apache.flink.runtime.state.memory.MemoryStateBackend$MemoryCheckpointOutputStream.closeAndGetHandle(MemoryStateBackend.java:190) > at > org.apache.flink.runtime.state.AbstractStateBackend$CheckpointStateOutputView.closeAndGetHandle(AbstractStateBackend.java:447) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.snapshotOperatorState(WindowOperator.java:879) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:598) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$2.onEvent(StreamTask.java:762) > ... 8 more > {code} > Or: > {code} > 2016-08-09 17:44:43,626 INFO > org.apache.flink.streaming.runtime.tasks.StreamTask - Restoring > checkpointed state to task Fold: property_id, player -> 10-minute > Sliding-Window Percentile Aggregation -> Sink: InfluxDB (2/4) > 2016-08-09 17:44:51,236 ERROR akka.remote.EndpointWriter - > Transient association error (association remains live) > akka.remote.OversizedPayloadException: Discarding oversized payload sent to > Actor[akka.tcp://flink@10.55.2.212:6123/user/jobmanager#510517238]: max > allowed size 10485760 bytes, actual size of encoded class > org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint was > 10891825 bytes. > {code} > This can be fixed by simply submitting the job with a parallelism of 2. I > suspect there was a regression introduced relating to assumptions about the > number of sub-tasks associated with a job stage (e.g. assuming 4 instead of a > value ranging from 1-4). This is currently preventing me from using all > available Task Manager slots. -- This message was sent by Atlassian JIRA (v6.3.4#6332)