+CC zhouye...@gmail.com

On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran <alta...@mit.edu> wrote:

> Hi,
>
> First of all, I am very thankful for all of the amazing work that goes
> into this project! It has opened up so many doors for me! I am a long
> time Spark user, and was very excited to start working with the push-based
> shuffle service for an academic paper we are working on, but I encountered
> some difficulties along the way and am wondering if someone could help me
> resolve this new feature. I was able to get the push-based shuffle running
> on my yarn setup (I am using Dataproc but I added an additional spark 3.2
> installation on top of the dataproc base installations using a custom
> image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced
> it with the new one for spark 3.2), however the main issue is that when I
> actually try to use spark shuffles using the push-based shuffle, I
> consistently encounter errors of the following sort:
>
> 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost
> task 163.0 in stage 3.1 (TID 16729)
> (cluster-fast-w-0.c.altaeth-biolux.internal executor 1):
> FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal,
> 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message=
> org.apache.spark.shuffle.FetchFailedException
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167)
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903)
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84)
>         at
> org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
>         at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown
> Source)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown
> Source)
>         at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown
> Source)
>         at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>         at
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
>         at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
>         at
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
>         at
> org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>         at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
>         at org.apache.spark.scheduler.Task.run(Task.scala:131)
>         at
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
>         at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:750)
> Caused by: java.io.IOException: Failed to send RPC
> StreamChunkId[streamId=1514743314249,chunkIndex=59] to
> cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337:
> java.io.IOException: Connection reset by peer
>         at
> org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146)
>         at
> org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369)
>         at
> io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
>         at
> io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)
>         at
> io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
>         at
> io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184)
>         at
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
>         at
> io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
>         at
> org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151)
>         at
> org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297)
>         at
> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196)
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at
> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
>         at
> io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
>         at
> io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>         at
> io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
> Caused by: java.io.IOException: Connection reset by peer
>         at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>         at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
>         at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>         at sun.nio.ch.IOUtil.write(IOUtil.java:51)
>         at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470)
>         at
> io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408)
>         at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949)
>         at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354)
>         at
> io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913)
>         at
> io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
>         at
> io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
>         at
> io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
> 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler: Exception
> encountered when trying to finalize shuffle merge on
> cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1
> java.lang.RuntimeException: java.lang.UnsupportedOperationException:
> Cannot handle shuffle block merge
>
> This arises using the following conf:
> PYSPARK_DRIVER_PYTHON=`which ipython` \
> PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \
> pyspark --master yarn \
> --deploy-mode client \
> --driver-memory 50g \
> --conf spark.executor.memory=114000m \
> --conf spark.task.cpus=1
> --conf spark.executor.cores=32 \
> --conf spark.dynamicAllocation.enabled=true \
> --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \
> --num-executors 500 \
> --conf spark.task.maxFailures=30 \
> --conf spark.storage.replication.active=true \
> --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \
> --conf spark.executor.memoryOverhead=2048m
> --conf spark.stage.maxConsecutiveAttempts=1000 -\
> -conf spark.default.parallelism=10811 \
> --conf spark.sql.shuffle.partitions=10811 \
> --conf spark.sql.sources.partitionOverwriteMode="dynamic" \
> --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \
> --conf spark.hadoop.dfs.replication=1 \
> --conf spark.shuffle.io.numConnectionsPerPeer=5 \
> --conf spark.locality.wait=3s \
> --conf spark.shuffle.push.enabled=true \
> --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \
> --conf spark.shuffle.service.enabled=true \
> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
> --conf
> spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver
> \
> --conf spark.yarn.shuffle.stopOnFailure=false \
> --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \
> --conf spark.shuffle.push.mergersMinStaticThreshold=1
>
> which cause the stage to be retried multiple times. Do you know if there
> is something obvious that might be wrong with this setup? Thank you so much
> for your time and consideration!
>
> Best,
> Han
>

Reply via email to