Hi there,

With S3 as state backend, as well as keeping a large chunk of user state on 
heap. I can see task manager starts to fail without showing OOM exception. 
Instead, it shows a generic error message (below) when checkpoint triggered. I 
assume this has something to do with how state were kept in buffer and flush to 
s3 when checkpoint triggered. 

Future, to keep large key/value space, wiki point out using rocksdb as backend. 
My understanding is using rocksdb will write to local file systems instead of 
sync to s3. Does flink support memory->rocksdb(local disk)->s3 checkpoint state 
split yet? Or would implement kvstate interface makes flink take care of large 
state problem?

Chen

java.lang.Exception: The slot in which the task was executed has been released. 
Probably loss of TaskManager eddbcda03a61f61210063a7cd2148b36 @ 10.163.98.18 - 
24 slots - URL: akka.tcp://flink@10.163.98.18:6124/user/taskmanager at 
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:151) 
at 
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:547)
 at 
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:119) 
at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:156) at 
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:215)
 at 
org.apache.flink.runtime.jobmanager.JobManager$anonfun$handleMessage$1.applyOrElse(JobManager.scala:847)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at 
org.apache.flink.runtime.LeaderSessionMessageFilter$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:44)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
 at 
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
 at org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:33) at 
org.apache.flink.runtime.LogMessages$anon$1.apply(LogMessages.scala:28) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) at 
org.apache.flink.runtime.LogMessages$anon$1.applyOrElse(LogMessages.scala:28) 
at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at 
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:106)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at 
akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) at 
akka.actor.ActorCell.receivedTerminated(ActorCell.scala:369) at 
akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:501) at 
akka.actor.ActorCell.invoke(ActorCell.scala:486) at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) at 
akka.dispatch.Mailbox.run(Mailbox.scala:221) at 
akka.dispatch.Mailbox.exec(Mailbox.scala:231) at 
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Reply via email to