Hi Stephan, Apologies, I hit send too soon on the last email.
So, while trying to debug this, I ran it multiple times on different instance types(to increase RAM available) and while digging into the logs, I found this to be the error in the task manager logs: / java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104) at org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89) at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80) at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35) at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90) at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490) at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) at java.lang.Thread.run(Thread.java:748) Caused by: java.io.IOException: I/O channel already closed. Could not fulfill: org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26) at org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111) at org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278) at org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) ... 13 more / Any idea on a fix for this issue? I can't seem to find any further information on this in the mailing lists. Thank you. -- Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/