Yes, if one key has too many values, there still has a chance to meet the OOM.

Thanks
Jerry

From: Jianshi Huang [mailto:jianshi.hu...@gmail.com]
Sent: Thursday, March 5, 2015 3:49 PM
To: Shao, Saisai
Cc: Cheng, Hao; user
Subject: Re: Having lots of FetchFailedException in join

I see. I'm using core's join. The data might have some skewness (checking).

I understand shuffle can spill data to disk but when consuming it, say in 
cogroup or groupByKey, it still needs to read the whole group elements, right? 
I guess OOM happened there when reading very large groups.

Jianshi

On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai 
<saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote:
I think what you could do is to monitor through web UI to see if there’s any 
skew or other symptoms in shuffle write and read. For GC you could use the 
below configuration as you mentioned.

From Spark core side, all the shuffle related operations can spill the data 
into disk and no need to read the whole partition into memory. But if you uses 
SparkSQL, it depends on how SparkSQL uses this operators.

CC @hao if he has some thoughts on it.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Thursday, March 5, 2015 3:28 PM
To: Shao, Saisai

Cc: user
Subject: Re: Having lots of FetchFailedException in join

Hi Saisai,

What's your suggested settings on monitoring shuffle? I've enabled 
-XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.

I found SPARK-3461 (Support external groupByKey using 
repartitionAndSortWithinPartitions) want to make groupByKey using external 
storage. It's still open status. Does that mean now 
groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read the 
group as a whole during consuming?

How can I deal with the key skewness in joins? Is there a skew-join 
implementation?


Jianshi



On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai 
<saisai.s...@intel.com<mailto:saisai.s...@intel.com>> wrote:
Hi Jianshi,

From my understanding, it may not be the problem of NIO or Netty, looking at 
your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), 
theoretically EAOM can spill the data into disk if memory is not enough, but 
there might some issues when join key is skewed or key number is smaller, so 
you will meet OOM.

Maybe you could monitor each stage or task’s shuffle and GC status also system 
status to identify the problem.

Thanks
Jerry

From: Jianshi Huang 
[mailto:jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>]
Sent: Thursday, March 5, 2015 2:32 PM
To: Aaron Davidson
Cc: user
Subject: Re: Having lots of FetchFailedException in join

One really interesting is that when I'm using the netty-based 
spark.shuffle.blockTransferService, there's no OOM error messages 
(java.lang.OutOfMemoryError: Java heap space).

Any idea why it's not here?

I'm using Spark 1.2.1.

Jianshi

On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang 
<jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>> wrote:
I changed spark.shuffle.blockTransferService to nio and now I'm getting OOM 
errors, I'm doing a big join operation.


15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 (TID 
6207)
java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
        at 
org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
        at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
        at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)

Is join/cogroup still memory bound?


Jianshi



On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang 
<jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>> wrote:
Hmm... ok, previous errors are still block fetch errors.

15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning fetch 
of 11 outstanding blocks
java.io.IOException: Failed to connect to host-xxxx/xxxx:55597
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
        at 
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
        at 
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
        at 
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
        at 
org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
        at 
org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
        at 
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
        at 
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at 
org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:56)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:724)
Caused by: java.net.ConnectException: Connection refused: 
lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597<http://lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597>
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at 
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
        at 
io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
        at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)

And I checked executor on container host-xxxx, everything is good.

Jianshi


On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson 
<ilike...@gmail.com<mailto:ilike...@gmail.com>> wrote:
Drat! That doesn't help. Could you scan from the top to see if there were any 
fatal errors preceding these? Sometimes a OOM will cause this type of issue 
further down.

On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang 
<jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>> wrote:
The failed executor has the following error messages. Any hints?

15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 5711039715419258699
java.io.FileNotFoundException: 
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:724)
15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 7941985280808455530
java.io.FileNotFoundException: 
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:724)
15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking 
RpcHandler#receive() on RPC id 5413737659722448543
java.io.FileNotFoundException: 
/hadoop03/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-88ee/17/shuffle_0_1074_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
        at 
org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
        at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
        at java.lang.Thread.run(Thread.java:724)


Jianshi

On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson 
<ilike...@gmail.com<mailto:ilike...@gmail.com>> wrote:
"Failed to connect" implies that the executor at that host died, please check 
its logs as well.

On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang 
<jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>> wrote:
Sorry that I forgot the subject.

And in the driver, I got many FetchFailedException. The error messages are

15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID 7943, 
xxxx): FetchFailed(BlockManagerId(86, xxxx, 43070), shuffleId=0, mapId=24, 
reduceId=1220, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to 
xxxx/xxxx:43070
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
        at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)


Jianshi

On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang 
<jianshi.hu...@gmail.com<mailto:jianshi.hu...@gmail.com>> wrote:
Hi,

I got this error message:

15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block 
fetches
java.lang.RuntimeException: java.io.FileNotFoundException: 
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)
        at java.io.FileInputStream.open(Native Method)
        at java.io.FileInputStream.<init>(FileInputStream.java:146)
        at 
org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
        at 
org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
        at 
org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)


And then for the same index file and executor, I got the following errors 
multiple times

15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get block(s) 
from host-xxxx:39534
java.lang.RuntimeException: java.io.FileNotFoundException: 
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block 
shuffle_0_13_1228, and will not retry (0 retries)
java.lang.RuntimeException: java.io.FileNotFoundException: 
/hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
 (No such file or directory)

...
Caused by: java.net.ConnectException: Connection refused: host-xxxx....


What's the problem?

BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any bug 
fixes related to shuffle block fetching or index files after that?


Thanks,
--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/



--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/




--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/




--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/



--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/



--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/



--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/



--
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to