Hi Arnauld,

in a previous mail you said:
"Note that I did not rebuild & reinstall flink, I just used a 0.10-SNAPSHOT
compiled jar submitted as a batch job using the "0.10.0" flink installation"

This will not fix the Netty version error. You need to install a new Flink
version or submit the Flink job, with a new Flink version to YARN to make
sure that the correct Netty version is used.

Best, Fabian

2016-02-03 10:44 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Hi!
>
> I think the closed channel is actually an effect of the process kill.
> Before the exception, you can see "15:22:47,592 ERROR
> org.apache.flink.yarn.YarnTaskManagerRunner - RECEIVED SIGNAL 15: SIGTERM"
> in the log, which means that UNIX is killing the process.
> I assume that the first thing that happens is that UNIX closes the open
> file handles, while the JVM shutdown hooks are still in progress. Hence the
> exception.
>
> So, the root cause is still the YARN memory killer.
>
> The log comes from release version 0.10.0.
> The Netty fix came into Flink after version 0.10.1 - so it is currently
> only in 0.10-SNAPSHOT (and will be in 0.10.2 in a couple of days).
>
> Greetings,
> Stephan
>
>
> On Wed, Feb 3, 2016 at 10:11 AM, LINZ, Arnaud <al...@bouyguestelecom.fr>
> wrote:
>
>> Hi,
>>
>>
>>
>> I see nothing wrong in the log of the killed container (it’s in fact
>> strange that it fails with I/O channel closure before it is killed by
>> yarn), but I’ll post new logs with memory debug as a web download within
>> the day.
>>
>>
>>
>> In the mean time, log extract :
>>
>>
>>
>> Container: container_e11_1453202008841_2868_01_000018 on
>> h1r1dn06.bpa.bouyguestelecom.fr_45454
>>
>>
>> ================================================================================================
>>
>>
>>
>> …
>>
>> 15:04:01,234 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --------------------------------------------------------------------------------
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Starting
>> YARN TaskManager (Version: 0.10.0, Rev:ab2cca4, Date:10.11.2015 @ 13:50:14
>> UTC)
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Current
>> user: datcrypt
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM: Java
>> HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.7/24.45-b08
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Maximum
>> heap size: 6900 MiBytes
>>
>> 15:04:01,236 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JAVA_HOME:
>> /usr/java/default
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Hadoop
>> version: 2.6.0
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  JVM
>> Options:
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Xms7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Xmx7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -XX:MaxDirectMemorySize=7200m
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlogback.configurationFile=file:logback.xml
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> -Dlog4j.configuration=file:log4j.properties
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -  Program
>> Arguments:
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --configDir
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -     .
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --streamingMode
>>
>> 15:04:01,238 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunner                   -     batch
>>
>> 15:04:01,238 INFO
>>  org.apache.flink.yarn.YarnTaskManagerRunner                   -
>> --------------------------------------------------------------------------------
>>
>> …
>>
>> 15:04:02,215 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Starting
>> TaskManager actor
>>
>> 15:04:02,224 INFO
>> org.apache.flink.runtime.io.network.netty.NettyConfig         - NettyConfig
>> [server address: bt1shlhr/172.21.125.16, server port: 47002, memory
>> segment size (bytes): 32768, transport type: NIO, number of server threads:
>> 0 (use Netty's default), number of client threads: 0 (use Netty's default),
>> server connect backlog: 0 (use Netty's default), client connect timeout
>> (sec): 120, send/receive buffer size (bytes): 0 (use Netty's default)]
>>
>> 15:04:02,226 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Messages
>> between TaskManager and JobManager have a max timeout of 100000 milliseconds
>>
>> …
>>
>>
>>
>> 15:04:02,970 INFO
>> org.apache.flink.runtime.io.network.buffer.NetworkBufferPool  - Allocated
>> 1024 MB for network buffer pool (number of memory segments: 32768, bytes
>> per segment: 32768).
>>
>> 15:04:03,527 INFO
>> org.apache.flink.runtime.taskmanager.TaskManager              - Using 0.7
>> of the currently free heap space for Flink managed heap memory (4099 MB).
>>
>> 15:04:06,250 INFO
>> org.apache.flink.runtime.io.disk.iomanager.IOManager          - I/O manager
>> uses directory
>> /data/1/hadoop/yarn/local/usercache/datcrypt/appcache/application_1453202008841_2868/flink-io-5cc3aa50-6723-460e-a722-800dd908e9e8
>> for spill files.
>>
>> …
>>
>> 15:04:06,429 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
>> data connection information: h1r1dn06.bpa.bouyguestelecom.fr
>> (dataPort=47002)
>>
>> 15:04:06,430 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - TaskManager
>> has 2 task slot(s).
>>
>> 15:04:06,431 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Memory
>> usage stats: [HEAP: 5186/6900/6900 MB, NON HEAP: 25/50/130 MB
>> (used/committed/max)]
>>
>> 15:04:06,438 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Trying to
>> register at JobManager akka.tcp://
>> flink@172.21.125.31:36518/user/jobmanager (attempt 1, timeout: 500
>> milliseconds)
>>
>> 15:04:06,591 INFO
>> org.apache.flink.yarn.YarnTaskManager                         - Successful
>> registration at JobManager (akka.tcp://
>> flink@172.21.125.31:36518/user/jobmanager), starting network stack and
>> library cache.
>>
>> …
>>
>>
>>
>> 15:17:22,191 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FINISHED to JobManager
>> for task DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
>> (c9dc588ceb209d98fd08b5144a59adfc)
>>
>> 15:17:22,196 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - DataSink
>> (Hive Output to DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>> switched to FINISHED
>>
>> 15:17:22,197 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE) (76/95)
>>
>> 15:17:22,197 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FINISHED to JobManager
>> for task DataSink (Hive Output to
>> DEV_VOY_KBR_TRV.EX_GOS_TP_STAGES_TRANSPORT_MODE)
>> (0c1c027e2ca5111e3e54c98b6d7265d7)
>>
>> 15:22:47,592 ERROR
>> org.apache.flink.yarn.YarnTaskManagerRunner                   - RECEIVED
>> SIGNAL 15: SIGTERM
>>
>> 15:22:47,608 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,608 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,617 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
>> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
>> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (89/95) switched to
>> FAILED with exception.
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@29864ea1
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,619 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - CHAIN
>> GroupReduce (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) ->
>> Map (Map at writeExternalTable(HiveHCatDAO.java:206)) (87/95) switched to
>> FAILED with exception.
>>
>> java.lang.Exception: The data preparation for task 'CHAIN GroupReduce
>> (GroupReduce at process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206))' , caused an error: Error
>> obtaining the sorted input: Thread 'SortMerger spilling thread' terminated
>> due to an exception: java.io.IOException: I/O channel already closed. Could
>> not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:465)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.lang.RuntimeException: Error obtaining the sorted input:
>> Thread 'SortMerger spilling thread' terminated due to an exception:
>> java.io.IOException: I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger.getIterator(UnilateralSortMerger.java:619)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.getInput(BatchTask.java:1089)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.prepare(GroupReduceDriver.java:94)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:459)
>>
>>         ... 3 more
>>
>> Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: java.io.IOException: I/O channel already
>> closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:800)
>>
>> Caused by: com.esotericsoftware.kryo.KryoException: java.io.IOException:
>> I/O channel already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:165)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.serialize(KryoSerializer.java:194)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:247)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase.copy(TupleSerializerBase.java:73)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.NormalizedKeySorter.writeToOutput(NormalizedKeySorter.java:499)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$SpillingThread.go(UnilateralSortMerger.java:1344)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.UnilateralSortMerger$ThreadBase.run(UnilateralSortMerger.java:796)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentWriteRequest@4e81c03c
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:54)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockWriterWithCallback.writeBlock(AsynchronousBlockWriterWithCallback.java:29)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.writeSegment(ChannelWriterOutputView.java:217)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView.nextSegment(ChannelWriterOutputView.java:203)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.advance(AbstractPagedOutputView.java:140)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedOutputView.write(AbstractPagedOutputView.java:201)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataOutputViewStream.write(DataOutputViewStream.java:39)
>>
>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>
>>         ... 8 more
>>
>> 15:22:47,627 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (87/95)
>>
>> 15:22:47,627 INFO
>> org.apache.flink.runtime.taskmanager.Task                     - Freeing
>> task resources for CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (89/95)
>>
>> 15:22:47,664 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FAILED to JobManager
>> for task CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (8d6b8b11714a27ac1ca83b39bdee577f)
>>
>> 15:22:47,738 INFO
>> org.apache.flink.yarn.YarnTaskManager                         -
>> Unregistering task and sending final execution state FAILED to JobManager
>> for task CHAIN GroupReduce (GroupReduce at
>> process(TransfoStage2StageOnTaz.java:106)) -> Map (Map at
>> writeExternalTable(HiveHCatDAO.java:206)) (73c6c2f15159dcb134e3899064a30f33)
>>
>> 15:22:47,841 ERROR
>> org.apache.flink.runtime.operators.BatchTask                  - Error in
>> task code:  CHAIN GroupReduce (GroupReduce at
>> calculeMinArea(TransfoStage2StageOnTaz.java:159)) -> Map (Key Extractor 1)
>> (88/95)
>>
>> com.esotericsoftware.kryo.KryoException: java.io.IOException: I/O channel
>> already closed. Could not fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>>
>>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:148)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:74)
>>
>>         at
>> com.esotericsoftware.kryo.io.Input.readLong_slow(Input.java:756)
>>
>>         at com.esotericsoftware.kryo.io.Input.readVarLong(Input.java:690)
>>
>>         at com.esotericsoftware.kryo.io.Input.readLong(Input.java:685)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.UnsafeCacheFields$UnsafeLongField.read(UnsafeCacheFields.java:160)
>>
>>         at
>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
>>
>>         at
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:222)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:236)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>
>>         at
>> org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator.next(ChannelReaderInputViewIterator.java:86)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.MergeIterator$HeadStream.nextHead(MergeIterator.java:151)
>>
>>         at
>> org.apache.flink.runtime.operators.sort.MergeIterator.next(MergeIterator.java:85)
>>
>>         at
>> org.apache.flink.runtime.util.ReusingKeyGroupedIterator$ValuesIterator.hasNext(ReusingKeyGroupedIterator.java:186)
>>
>>         at
>> org.apache.flink.api.java.operators.translation.TupleUnwrappingIterator.hasNext(TupleUnwrappingIterator.java:49)
>>
>>         at
>> com.bouygtel.kubera.processor.stage.TransfoStage2StageOnTaz$6.reduce(TransfoStage2StageOnTaz.java:170)
>>
>>         at
>> org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator$TupleUnwrappingNonCombinableGroupReducer.reduce(PlanUnwrappingReduceGroupOperator.java:101)
>>
>>         at
>> org.apache.flink.runtime.operators.GroupReduceDriver.run(GroupReduceDriver.java:118)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:489)
>>
>>         at
>> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:354)
>>
>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>
>>         at java.lang.Thread.run(Thread.java:744)
>>
>> Caused by: java.io.IOException: I/O channel already closed. Could not
>> fulfill:
>> org.apache.flink.runtime.io.disk.iomanager.SegmentReadRequest@26707c34
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:75)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.AsynchronousBlockReader.readBlock(AsynchronousBlockReader.java:43)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.sendReadRequest(ChannelReaderInputView.java:259)
>>
>>         at
>> org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView.nextSegment(ChannelReaderInputView.java:224)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedInputView.advance(AbstractPagedInputView.java:159)
>>
>>         at
>> org.apache.flink.runtime.memory.AbstractPagedInputView.read(AbstractPagedInputView.java:213)
>>
>>         at
>> org.apache.flink.api.java.typeutils.runtime.DataInputViewStream.read(DataInputViewStream.java:68)
>>
>>         at com.esotericsoftware.kryo.io.Input.fill(Input.java:146)
>>
>>         ... 25 more
>>
>>
>>
>>
>>
>> (...)
>>
>> ______________________
>>
>> 15:22:51,798 INFO
>> org.apache.flink.yarn.YarnJobManager                          - Container
>> container_e11_1453202008841_2868_01_000018 is completed with diagnostics:
>> Container
>> [pid=14548,containerID=container_e11_1453202008841_2868_01_000018] is
>> running beyond physical memory limits. Current usage: 12.1 GB of 12 GB
>> physical memory used; 13.0 GB of 25.2 GB virtual memory used. Killing
>> container.
>>
>> Dump of the process-tree for container_e11_1453202008841_2868_01_000018 :
>>
>>         |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
>> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>>
>>         |- 14548 14542 14548 14548 (bash) 0 0 108646400 310 /bin/bash -c
>> /usr/java/default/bin/java -Xms7200m -Xmx7200m
>> -XX:MaxDirectMemorySize=7200m
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . 1>
>> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.out
>> 2>
>> /data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.err
>> --streamingMode batch
>>
>>         |- 14558 14548 14548 14548 (java) 631070 15142 13881634816
>> 3163462 /usr/java/default/bin/java -Xms7200m -Xmx7200m
>> -XX:MaxDirectMemorySize=7200m
>> -Dlog.file=/data/2/hadoop/yarn/log/application_1453202008841_2868/container_e11_1453202008841_2868_01_000018/taskmanager.log
>> -Dlogback.configurationFile=file:logback.xml
>> -Dlog4j.configuration=file:log4j.properties
>> org.apache.flink.yarn.YarnTaskManagerRunner --configDir . --streamingMode
>> batch
>>
>>
>>
>> Container killed on request. Exit code is 143
>>
>> Container exited with a non-zero exit code 143
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *De :* ewenstep...@gmail.com [mailto:ewenstep...@gmail.com] *De la part
>> de* Stephan Ewen
>> *Envoyé :* mardi 2 février 2016 20:20
>> *À :* user@flink.apache.org
>> *Objet :* Re: Left join with unbalanced dataset
>>
>>
>>
>> To make sure this discussion does not go in a wrong direction:
>>
>>
>>
>> There is no issue here with data size, or memory management. The
>> MemoryManagement for sorting and hashing works, and Flink handles the
>> spilling correctly, etc.
>>
>>
>>
>> The issue here is different
>>
>>    - One possible reason is that the network stack (specifically the
>> Netty library) allocates too much direct (= off heap) memory for buffering
>> the TCP connections.
>>
>>    - Another reason could be leaky behavior in Hadoop's HDFS code.
>>
>>
>>
>>
>>
>> @Arnaud: We need the full log of the TaskManager that initially
>> experiences that failure, then we can look into this. Best would be with
>> activated memory logging, like suggested by Ufuk.
>>
>>
>>
>> Best,
>>
>> Stephan
>>
>>
>>
>> ------------------------------
>>
>> L'intégrité de ce message n'étant pas assurée sur internet, la société
>> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
>> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si
>> vous n'êtes pas destinataire de ce message, merci de le détruire et
>> d'avertir l'expéditeur.
>>
>> The integrity of this message cannot be guaranteed on the Internet. The
>> company that sent this message cannot therefore be held liable for its
>> content nor attachments. Any unauthorized use or dissemination is
>> prohibited. If you are not the intended recipient of this message, then
>> please delete it and notify the sender.
>>
>
>

Reply via email to