Hello All,

I'm running a simple word count example using the quickstart package from
the Flink(0.10.1), on an input dataset of 500MB. This dataset is a set of
randomly generated words of length 8.

Cluster Configuration:

Number of machines: 7
Total cores : 25
Memory on each: 64GB

I'm interested in the performance measure between Batch and Stream modes
and so I'm running WordCount example with number of iteration (max 10) on
datasets of sizes ranging between 100MB and 50GB consisting of random words
of length 4 and 8.

While I ran the experiments in Batch mode all iterations ran fine, but now
I'm stuck in the Streaming mode at this

Caused by: java.lang.OutOfMemoryError: Java heap space
        at java.util.HashMap.resize(HashMap.java:580)
        at java.util.HashMap.addEntry(HashMap.java:879)
        at java.util.HashMap.put(HashMap.java:505)
        at
org.apache.flink.runtime.state.AbstractHeapKvState.update(AbstractHeapKvState.java:98)
        at
org.apache.flink.streaming.api.operators.StreamGroupedReduce.processElement(StreamGroupedReduce.java:59)
        at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:166)
        at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:218)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
        at java.lang.Thread.run(Thread.java:745)

I investigated found 2 solutions. (1) Increasing the taskmanager.heap.mb
and (2) Reducing the taskmanager.memory.fraction

Therefore I set taskmanager.heap.mb: 1024 and taskmanager.memory.fraction:
0.5 (default 0.7)

When I ran the example with this setting I loose taskmanagers one by one
during the job execution with the following cause

Caused by: java.lang.Exception: The slot in which the task was executed has
been released. Probably loss of TaskManager
831a72dad6fbb533b193820f45bdc5bc @ vm-10-155-208-138 - 4 slots - URL:
akka.tcp://flink@10.155.208.138:42222/user/taskmanager
        at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:153)
        at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
        at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119)
        at
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156)
        at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
        at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:696)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
        at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
        at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
        at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:100)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
        at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369)
        at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501)
        at akka.actor.ActorCell.invoke(ActorCell.scala:486)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        ... 2 more


While I look at the results generated at each taskmanager, they are fine.
The logs also don't show any causes for the the job to get cancelled.


Could anyone kindly guide me here?

Kind Regards,
Ravinder Kaur.

Reply via email to