Thanks for your answer! I don't call .collect because I want to trigger the execution. I call it because I need the rdd on the driver. This is not a huge RDD and it's not larger than the one returned with 50GB input data.
The end of the stack trace: The two IP's are the two worker nodes, I think they can't connect to the driver after they finished their part of the collect(). 15/03/30 10:38:25 INFO executor.Executor: Finished task 872.0 in stage 1.0 (TID 1745). 1414 bytes result sent to driver 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(200) called with curMem=405753, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_867 stored as values in memory (estimated size 200.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.MemoryStore: ensureFreeSpace(80) called with curMem=405953, maxMem=4883742720 15/03/30 10:38:25 INFO storage.MemoryStore: Block rdd_4_868 stored as values in memory (estimated size 80.0 B, free 4.5 GB) 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_867 15/03/30 10:38:25 INFO executor.Executor: Finished task 867.0 in stage 1.0 (TID 1740). 1440 bytes result sent to driver 15/03/30 10:38:25 INFO storage.BlockManagerMaster: Updated info of block rdd_4_868 15/03/30 10:38:25 INFO executor.Executor: Finished task 868.0 in stage 1.0 (TID 1741). 1422 bytes result sent to driver 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:42026 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:45 WARN server.TransportChannelHandler: Exception in connection from /10.102.129.251:41703 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:46 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49021 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 15/03/30 10:53:51 WARN server.TransportChannelHandler: Exception in connection from /10.99.144.92:49033 java.io.IOException: Connection timed out at sun.nio.ch.FileDispatcherImpl.read0(Native Method) at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) at sun.nio.ch.IOUtil.read(IOUtil.java:192) at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881) at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:225) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at java.lang.Thread.run(Thread.java:745) 2015-03-29 17:01 GMT+02:00 Akhil Das <ak...@sigmoidanalytics.com>: > Don't call .collect if your data size huge, you can simply do a count() to > trigger the execution. > > Can you paste your exception stack trace so that we'll know whats > happening? > > Thanks > Best Regards > > On Fri, Mar 27, 2015 at 9:18 PM, Zsolt Tóth <toth.zsolt....@gmail.com> > wrote: > >> Hi, >> >> I have a simple Spark application: it creates an input rdd with >> sc.textfile, and it calls flatMapToPair, reduceByKey and map on it. The >> output rdd is small, a few MB's. Then I call collect() on the output. >> >> If the textfile is ~50GB, it finishes in a few minutes. However, if it's >> larger (~100GB) the execution hangs at the end of the collect() stage. The >> UI shows one active job (collect); one completed (flatMapToPair) and one >> active stage (collect). The collect stage has 880/892 tasks succeeded so I >> think the issue should happen when the whole job is finished (every task on >> the UI is either in SUCCESS or in RUNNING state). >> The driver and the containers don't log anything for 15 mins, then I get >> Connection time out. >> >> I run the job in yarn-cluster mode on Amazon EMR with Spark 1.2.1 and >> Hadoop 2.4.0. >> >> This happens every time I run the process with larger input data so I >> think this isn't just a connection issue or something like that. Is this a >> Spark bug or something is wrong with my setup? >> >> Zsolt >> > >