There're some skew.

6461640SUCCESSPROCESS_LOCAL200 / xxxx2015/03/04 23:45:471.1 min6 s198.6 MB21.1
GB240.8 MB5961590SUCCESSPROCESS_LOCAL30 / xxxx2015/03/04 23:45:4744 s5 s200.7
MB4.8 GB154.0 MB
But I expect this kind of skewness to be quite common.

Jianshi


On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang <jianshi.hu...@gmail.com>
wrote:

> I see. I'm using core's join. The data might have some skewness
> (checking).
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
> Jianshi
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>>  I think what you could do is to monitor through web UI to see if
>> there’s any skew or other symptoms in shuffle write and read. For GC you
>> could use the below configuration as you mentioned.
>>
>>
>>
>> From Spark core side, all the shuffle related operations can spill the
>> data into disk and no need to read the whole partition into memory. But if
>> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>>
>>
>>
>> CC @hao if he has some thoughts on it.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Thursday, March 5, 2015 3:28 PM
>> *To:* Shao, Saisai
>>
>> *Cc:* user
>> *Subject:* Re: Having lots of FetchFailedException in join
>>
>>
>>
>> Hi Saisai,
>>
>>
>>
>> What's your suggested settings on monitoring shuffle? I've
>> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>>
>>
>>
>> I found SPARK-3461 (Support external groupByKey using
>> repartitionAndSortWithinPartitions) want to make groupByKey using external
>> storage. It's still open status. Does that mean now
>> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
>> the group as a whole during consuming?
>>
>>
>>
>> How can I deal with the key skewness in joins? Is there a skew-join
>> implementation?
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai <saisai.s...@intel.com>
>> wrote:
>>
>>  Hi Jianshi,
>>
>>
>>
>> From my understanding, it may not be the problem of NIO or Netty, looking
>> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
>> theoretically EAOM can spill the data into disk if memory is not enough,
>> but there might some issues when join key is skewed or key number is
>> smaller, so you will meet OOM.
>>
>>
>>
>> Maybe you could monitor each stage or task’s shuffle and GC status also
>> system status to identify the problem.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
>> *Sent:* Thursday, March 5, 2015 2:32 PM
>> *To:* Aaron Davidson
>> *Cc:* user
>> *Subject:* Re: Having lots of FetchFailedException in join
>>
>>
>>
>> One really interesting is that when I'm using the
>> netty-based spark.shuffle.blockTransferService, there's no OOM error
>> messages (java.lang.OutOfMemoryError: Java heap space).
>>
>>
>>
>> Any idea why it's not here?
>>
>>
>>
>> I'm using Spark 1.2.1.
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang <jianshi.hu...@gmail.com>
>> wrote:
>>
>>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
>> OOM errors, I'm doing a big join operation.
>>
>>
>>
>>
>>
>> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
>> (TID 6207)
>>
>> java.lang.OutOfMemoryError: Java heap space
>>
>>         at
>> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>>
>>         at
>> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>>
>>         at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>>
>>         at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>>
>>         at
>> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>>
>>         at
>> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>>
>>         at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>
>>         at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>>         at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>
>>         at
>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>
>>         at
>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>
>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>
>>
>>
>> Is join/cogroup still memory bound?
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang <jianshi.hu...@gmail.com>
>> wrote:
>>
>>  Hmm... ok, previous errors are still block fetch errors.
>>
>>
>>
>> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
>> fetch of 11 outstanding blocks
>>
>> java.io.IOException: Failed to connect to host-xxxx/xxxx:55597
>>
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>>
>>         at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>>
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>
>>         at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>>
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
>>
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
>>
>>         at
>> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>>
>>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>
>>         at
>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>
>>         at
>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>
>>         at
>> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>>
>>         at
>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>>
>>         at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>
>>         at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>
>>         at
>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>>
>>         at
>> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>         at
>> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>         at
>> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>>
>>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>>
>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>>
>>         at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>
>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>
>>         at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>
>>         at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>
>>         at java.lang.Thread.run(Thread.java:724)
>>
>> Caused by: java.net.ConnectException: Connection refused:
>> lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597
>>
>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>>
>>         at
>> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>>
>>         at
>> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>
>>
>>
>> And I checked executor on container host-xxxx, everything is good.
>>
>>
>>
>> Jianshi
>>
>>
>>
>>
>>
>> On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson <ilike...@gmail.com>
>> wrote:
>>
>>  Drat! That doesn't help. Could you scan from the top to see if there
>> were any fatal errors preceding these? Sometimes a OOM will cause this type
>> of issue further down.
>>
>>
>>
>> On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang <jianshi.hu...@gmail.com>
>> wrote:
>>
>>  The failed executor has the following error messages. Any hints?
>>
>>
>>
>> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
>> RpcHandler#receive() on RPC id 5711039715419258699
>>
>> java.io.FileNotFoundException:
>> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
>> (No such file or directory)
>>
>>         at java.io.FileInputStream.open(Native Method)
>>
>>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>
>>         at
>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>>
>>         at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>
>>         at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>
>>         at java.lang.Thread.run(Thread.java:724)
>>
>> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
>> RpcHandler#receive() on RPC id 7941985280808455530
>>
>> java.io.FileNotFoundException:
>> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
>> (No such file or directory)
>>
>>         at java.io.FileInputStream.open(Native Method)
>>
>>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>
>>         at
>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>>
>>         at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>
>>         at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>
>>         at java.lang.Thread.run(Thread.java:724)
>>
>> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
>> RpcHandler#receive() on RPC id 5413737659722448543
>>
>> java.io.FileNotFoundException:
>> /hadoop03/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-88ee/17/shuffle_0_1074_0.index
>> (No such file or directory)
>>
>>         at java.io.FileInputStream.open(Native Method)
>>
>>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>
>>         at
>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>>
>>         at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>>
>>         at
>> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>>
>>         at
>> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>>
>>         at
>> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>>
>>         at
>> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>>
>>         at
>> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>>
>>         at
>> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>
>>         at
>> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>
>>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>
>>         at
>> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>>
>>         at java.lang.Thread.run(Thread.java:724)
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson <ilike...@gmail.com>
>> wrote:
>>
>>  "Failed to connect" implies that the executor at that host died, please
>> check its logs as well.
>>
>>
>>
>> On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang <jianshi.hu...@gmail.com>
>> wrote:
>>
>>  Sorry that I forgot the subject.
>>
>>
>>
>> And in the driver, I got many FetchFailedException. The error messages are
>>
>>
>>
>> 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
>> 7943, xxxx): FetchFailed(BlockManagerId(86, xxxx, 43070), shuffleId=0,
>> mapId=24, reduceId=1220, message=
>>
>> org.apache.spark.shuffle.FetchFailedException: Failed to connect to
>> xxxx/xxxx:43070
>>
>>         at
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>>
>>         at
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>
>>         at
>> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>>
>>
>>
>>
>>
>> Jianshi
>>
>>
>>
>> On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang <jianshi.hu...@gmail.com>
>> wrote:
>>
>>  Hi,
>>
>>
>>
>> I got this error message:
>>
>>
>>
>> 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting
>> block fetches
>>
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
>> (No such file or directory)
>>
>>         at java.io.FileInputStream.open(Native Method)
>>
>>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>>
>>         at
>> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>>
>>         at
>> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>
>>         at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>
>>         at
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>
>>         at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>
>>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>
>>         at
>> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>>
>>
>>
>>
>>
>> And then for the same index file and executor, I got the following errors
>> multiple times
>>
>>
>>
>> 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
>> block(s) from host-xxxx:39534
>>
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
>> (No such file or directory)
>>
>>
>>
>> 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
>> shuffle_0_13_1228, and will not retry (0 retries)
>>
>> java.lang.RuntimeException: java.io.FileNotFoundException:
>> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
>> (No such file or directory)
>>
>>
>>
>> ...
>>
>> Caused by: java.net.ConnectException: Connection refused: host-xxxx....
>>
>>
>>
>>
>>
>> What's the problem?
>>
>>
>>
>> BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
>> bug fixes related to shuffle block fetching or index files after that?
>>
>>
>>
>>
>>  Thanks,
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>>
>>
>>
>>
>> --
>>
>> Jianshi Huang
>>
>> LinkedIn: jianshi
>> Twitter: @jshuang
>> Github & Blog: http://huangjs.github.com/
>>
>
>
>
> --
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to