There're some skew. 6461640SUCCESSPROCESS_LOCAL200 / xxxx2015/03/04 23:45:471.1 min6 s198.6 MB21.1 GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / xxxx2015/03/04 23:45:4744 s5 s200.7 MB4.8 GB154.0 MB But I expect this kind of skewness to be quite common.
Jianshi On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang <jianshi.hu...@gmail.com> wrote: > I see. I'm using core's join. The data might have some skewness > (checking). > > I understand shuffle can spill data to disk but when consuming it, say in > cogroup or groupByKey, it still needs to read the whole group elements, > right? I guess OOM happened there when reading very large groups. > > Jianshi > > On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai <saisai.s...@intel.com> > wrote: > >> I think what you could do is to monitor through web UI to see if >> there’s any skew or other symptoms in shuffle write and read. For GC you >> could use the below configuration as you mentioned. >> >> >> >> From Spark core side, all the shuffle related operations can spill the >> data into disk and no need to read the whole partition into memory. But if >> you uses SparkSQL, it depends on how SparkSQL uses this operators. >> >> >> >> CC @hao if he has some thoughts on it. >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Thursday, March 5, 2015 3:28 PM >> *To:* Shao, Saisai >> >> *Cc:* user >> *Subject:* Re: Having lots of FetchFailedException in join >> >> >> >> Hi Saisai, >> >> >> >> What's your suggested settings on monitoring shuffle? I've >> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging. >> >> >> >> I found SPARK-3461 (Support external groupByKey using >> repartitionAndSortWithinPartitions) want to make groupByKey using external >> storage. It's still open status. Does that mean now >> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read >> the group as a whole during consuming? >> >> >> >> How can I deal with the key skewness in joins? Is there a skew-join >> implementation? >> >> >> >> >> >> Jianshi >> >> >> >> >> >> >> >> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai <saisai.s...@intel.com> >> wrote: >> >> Hi Jianshi, >> >> >> >> From my understanding, it may not be the problem of NIO or Netty, looking >> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap), >> theoretically EAOM can spill the data into disk if memory is not enough, >> but there might some issues when join key is skewed or key number is >> smaller, so you will meet OOM. >> >> >> >> Maybe you could monitor each stage or task’s shuffle and GC status also >> system status to identify the problem. >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com] >> *Sent:* Thursday, March 5, 2015 2:32 PM >> *To:* Aaron Davidson >> *Cc:* user >> *Subject:* Re: Having lots of FetchFailedException in join >> >> >> >> One really interesting is that when I'm using the >> netty-based spark.shuffle.blockTransferService, there's no OOM error >> messages (java.lang.OutOfMemoryError: Java heap space). >> >> >> >> Any idea why it's not here? >> >> >> >> I'm using Spark 1.2.1. >> >> >> >> Jianshi >> >> >> >> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang <jianshi.hu...@gmail.com> >> wrote: >> >> I changed spark.shuffle.blockTransferService to nio and now I'm getting >> OOM errors, I'm doing a big join operation. >> >> >> >> >> >> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0 >> (TID 6207) >> >> java.lang.OutOfMemoryError: Java heap space >> >> at >> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142) >> >> at >> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178) >> >> at >> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122) >> >> at >> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121) >> >> at >> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138) >> >> at >> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) >> >> at >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >> >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> >> at >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> >> at >> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> >> at >> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >> >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> >> >> >> Is join/cogroup still memory bound? >> >> >> >> >> >> Jianshi >> >> >> >> >> >> >> >> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang <jianshi.hu...@gmail.com> >> wrote: >> >> Hmm... ok, previous errors are still block fetch errors. >> >> >> >> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning >> fetch of 11 outstanding blocks >> >> java.io.IOException: Failed to connect to host-xxxx/xxxx:55597 >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191) >> >> at >> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156) >> >> at >> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78) >> >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) >> >> at >> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) >> >> at >> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289) >> >> at >> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53) >> >> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) >> >> at >> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) >> >> at >> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) >> >> at >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160) >> >> at >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159) >> >> at >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) >> >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> >> at >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> >> at >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) >> >> at >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >> >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) >> >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) >> >> at >> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> >> at org.apache.spark.scheduler.Task.run(Task.scala:56) >> >> at >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197) >> >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >> >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >> >> at java.lang.Thread.run(Thread.java:724) >> >> Caused by: java.net.ConnectException: Connection refused: >> lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597 >> >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> >> at >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735) >> >> at >> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208) >> >> at >> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) >> >> >> >> And I checked executor on container host-xxxx, everything is good. >> >> >> >> Jianshi >> >> >> >> >> >> On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson <ilike...@gmail.com> >> wrote: >> >> Drat! That doesn't help. Could you scan from the top to see if there >> were any fatal errors preceding these? Sometimes a OOM will cause this type >> of issue further down. >> >> >> >> On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang <jianshi.hu...@gmail.com> >> wrote: >> >> The failed executor has the following error messages. Any hints? >> >> >> >> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking >> RpcHandler#receive() on RPC id 5711039715419258699 >> >> java.io.FileNotFoundException: >> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index >> (No such file or directory) >> >> at java.io.FileInputStream.open(Native Method) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:146) >> >> at >> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) >> >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >> >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> >> at java.lang.Thread.run(Thread.java:724) >> >> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking >> RpcHandler#receive() on RPC id 7941985280808455530 >> >> java.io.FileNotFoundException: >> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index >> (No such file or directory) >> >> at java.io.FileInputStream.open(Native Method) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:146) >> >> at >> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) >> >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >> >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> >> at java.lang.Thread.run(Thread.java:724) >> >> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking >> RpcHandler#receive() on RPC id 5413737659722448543 >> >> java.io.FileNotFoundException: >> /hadoop03/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-88ee/17/shuffle_0_1074_0.index >> (No such file or directory) >> >> at java.io.FileInputStream.open(Native Method) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:146) >> >> at >> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) >> >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124) >> >> at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91) >> >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44) >> >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> >> at java.lang.Thread.run(Thread.java:724) >> >> >> >> >> >> Jianshi >> >> >> >> On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson <ilike...@gmail.com> >> wrote: >> >> "Failed to connect" implies that the executor at that host died, please >> check its logs as well. >> >> >> >> On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang <jianshi.hu...@gmail.com> >> wrote: >> >> Sorry that I forgot the subject. >> >> >> >> And in the driver, I got many FetchFailedException. The error messages are >> >> >> >> 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID >> 7943, xxxx): FetchFailed(BlockManagerId(86, xxxx, 43070), shuffleId=0, >> mapId=24, reduceId=1220, message= >> >> org.apache.spark.shuffle.FetchFailedException: Failed to connect to >> xxxx/xxxx:43070 >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> >> at >> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) >> >> >> >> >> >> Jianshi >> >> >> >> On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang <jianshi.hu...@gmail.com> >> wrote: >> >> Hi, >> >> >> >> I got this error message: >> >> >> >> 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting >> block fetches >> >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index >> (No such file or directory) >> >> at java.io.FileInputStream.open(Native Method) >> >> at java.io.FileInputStream.<init>(FileInputStream.java:146) >> >> at >> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109) >> >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> >> at >> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> >> at >> scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> >> >> >> >> >> And then for the same index file and executor, I got the following errors >> multiple times >> >> >> >> 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get >> block(s) from host-xxxx:39534 >> >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index >> (No such file or directory) >> >> >> >> 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block >> shuffle_0_13_1228, and will not retry (0 retries) >> >> java.lang.RuntimeException: java.io.FileNotFoundException: >> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index >> (No such file or directory) >> >> >> >> ... >> >> Caused by: java.net.ConnectException: Connection refused: host-xxxx.... >> >> >> >> >> >> What's the problem? >> >> >> >> BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any >> bug fixes related to shuffle block fetching or index files after that? >> >> >> >> >> Thanks, >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> >> >> >> >> >> -- >> >> Jianshi Huang >> >> LinkedIn: jianshi >> Twitter: @jshuang >> Github & Blog: http://huangjs.github.com/ >> > > > > -- > Jianshi Huang > > LinkedIn: jianshi > Twitter: @jshuang > Github & Blog: http://huangjs.github.com/ > -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github & Blog: http://huangjs.github.com/