On further investigation, seems to me the I/O exception I posted previously is not the cause of the TM being lost. it's the after effect of the TM being shut down and the channel being closed after a record is emitted but before it's processed.
Previously, the logs didn't throw up this error and I'm also unable to reproduce it each time(I've come across the I/O exception twice so far). Most of the time, the logs don't have the I/O or any other exception/error messages. This is what the logs usually(without the I/O exception) look like: Job Manager: / 2017-10-12 22:22:41,857 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507845873691_0001_01_000008 failed. Exit status: -100 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507845873691_0001_01_000008 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1 2017-10-12 22:22:41,858 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 1 2017-10-12 22:22:42,096 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager terminated. 2017-10-12 22:22:42,210 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: container_1507845873691_0001_01_000008 @ ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 22:22:42,451 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Output (0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING. java.lang.Exception: TaskManager was lost/killed: container_1507845873691_0001_01_000008 @ ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 22:22:42,907 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:85)) (1/96) (8cf2869e9786809d1b9b9d12b9467e40) switched from RUNNING to CANCELING. / Task Manager: / 2017-10-12 22:22:38,570 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 5973/17600/17600 MB, NON HEAP: 72/73/-1 MB (used/committed/max)] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 3150, Total Capacity: 17138363, Used Memory: 17138364 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 22:22:38,631 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 6101/17600/17600 MB, NON HEAP: 72/73/-1 MB (used/committed/max)] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 3234, Total Capacity: 17139035, Used Memory: 17139036 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 18/18/240 MB (used/committed/max)], [Metaspace: 47/48/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 22:22:38,691 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 9950, GC COUNT: 369], [G1 Old Generation, GC TIME (ms): 688, GC COUNT: 2] *2017-10-12 22:22:38,709 INFO org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.* 2017-10-12 22:22:38,713 INFO org.apache.flink.runtime.blob.BlobCache - Shutting down BlobCache 2017-10-12 22:22:38,719 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-a5aace05-73ed-4cea-ad07-db86f9f8ce21 2017-10-12 22:22:38,719 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt1/yarn/usercache/hadoop/appcache/application_1507845873691_0001/flink-io-cb34ffbe-879f-47d4-9df3-6ed2b0dcd799 / This is what the logs sometimes(with the I/O exception) look like: Job Manager: / 2017-10-12 19:40:37,669 WARN akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@ip-172-31-11-129:43340] has failed, address is now gated for [5000] ms. Reason: [Disassociated] 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000015 failed. Exit status: -100 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000015 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,922 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 1 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000013 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000013 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 2 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000002 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000002 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 3 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Container container_1507836035753_0001_01_000003 failed. Exit status: -100 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container container_1507836035753_0001_01_000003 in state COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed containers so far: 4 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 1 2017-10-12 19:40:37,923 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 2 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 3 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@ip-172-31-1-178:33620/user/taskmanager terminated. 2017-10-12 19:40:37,924 INFO org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager container with 22000 megabytes memory. Pending requests: 4 2017-10-12 19:40:37,925 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87)) (40/136) (748d815623ff13e6357f351d5aa7b0f4) switched from RUNNING to FAILED. java.lang.Exception: TaskManager was lost/killed: container_1507836035753_0001_01_000015 @ ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2017-10-12 19:40:37,931 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Output (11771c44eace0a1e32de1c3ca1c60b09) switched from state RUNNING to FAILING. java.lang.Exception: TaskManager was lost/killed: container_1507836035753_0001_01_000015 @ ip-172-31-1-178.us-west-2.compute.internal (dataPort=35861) at org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) at org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533) at org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) at org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) at org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103) at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) at org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) / Task Manager: / 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 1347/17600/17600 MB, NON HEAP: 73/74/-1 MB (used/committed/max)] 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 162, Total Capacity: 17111387, Used Memory: 17111388 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace: 48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 19:40:34,959 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363], [G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Memory usage stats: [HEAP: 1467/17600/17600 MB, NON HEAP: 73/74/-1 MB (used/committed/max)] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Direct memory stats: Count: 196, Total Capacity: 17111659, Used Memory: 17111660 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Off-heap pool stats: [Code Cache: 19/19/240 MB (used/committed/max)], [Metaspace: 48/49/-1 MB (used/committed/max)], [Compressed Class Space: 5/6/1024 MB (used/committed/max)] 2017-10-12 19:40:35,019 INFO org.apache.flink.runtime.taskmanager.TaskManager - Garbage collector stats: [G1 Young Generation, GC TIME (ms): 8020, GC COUNT: 363], [G1 Old Generation, GC TIME (ms): 695, GC COUNT: 2] *2017-10-12 19:40:35,033 INFO org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.* 2017-10-12 19:40:35,043 ERROR org.apache.flink.runtime.operators.BatchTask - Error in task code: CHAIN Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter at main(FindOutput.java:87)) (86/136) java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696 at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@21d84696 at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111) at org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278) at org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 13 more 2017-10-12 19:40:35,050 INFO org.apache.flink.runtime.io.disk.iomanager.IOManager - I/O manager removed spill file directory /mnt/yarn/usercache/hadoop/appcache/application_1507836035753_0001/flink-io-81d98a3a-7a40-438f-93fa-3b1f9dfc1e1d / I still can't figure out why the TM shuts down and how to avoid this at all - seems like a memory/GC issue. I was able to have the job complete previously by increasing parallelism(number of task managers). But as my dataset size has increases, I'm running into this issue again and increasing parallelism is not working. Any help would be greatly appreciated! Thanks -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/