+CC zhouye...@gmail.com
On Mon, May 23, 2022 at 7:11 AM Han Altae-Tran <alta...@mit.edu> wrote: > Hi, > > First of all, I am very thankful for all of the amazing work that goes > into this project! It has opened up so many doors for me! I am a long > time Spark user, and was very excited to start working with the push-based > shuffle service for an academic paper we are working on, but I encountered > some difficulties along the way and am wondering if someone could help me > resolve this new feature. I was able to get the push-based shuffle running > on my yarn setup (I am using Dataproc but I added an additional spark 3.2 > installation on top of the dataproc base installations using a custom > image, and then removed the old 3.1.2 spark shuffle yarn jar and replaced > it with the new one for spark 3.2), however the main issue is that when I > actually try to use spark shuffles using the push-based shuffle, I > consistently encounter errors of the following sort: > > 22/05/23 05:45:01 WARN org.apache.spark.scheduler.TaskSetManager: Lost > task 163.0 in stage 3.1 (TID 16729) > (cluster-fast-w-0.c.altaeth-biolux.internal executor 1): > FetchFailed(BlockManagerId(2, cluster-fast-w-1.c.altaeth-biolux.internal, > 7337, None), shuffleId=0, mapIndex=171, mapId=11287, reduceId=808, message= > org.apache.spark.shuffle.FetchFailedException > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:1167) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:903) > at > org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:84) > at > org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29) > at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithoutKey_0$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) > at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) > at > org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140) > at > org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) > at org.apache.spark.scheduler.Task.run(Task.scala:131) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) > at > org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:750) > Caused by: java.io.IOException: Failed to send RPC > StreamChunkId[streamId=1514743314249,chunkIndex=59] to > cluster-fast-w-1.c.altaeth-biolux.internal/10.128.0.39:7337: > java.io.IOException: Connection reset by peer > at > org.apache.spark.network.client.TransportClient$1.handleFailure(TransportClient.java:146) > at > org.apache.spark.network.client.TransportClient$StdChannelListener.operationComplete(TransportClient.java:369) > at > io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578) > at > io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552) > at > io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491) > at > io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:184) > at > io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95) > at > io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30) > at > org.apache.spark.network.client.TransportClient.fetchChunk(TransportClient.java:151) > at > org.apache.spark.network.shuffle.OneForOneBlockFetcher$1.onSuccess(OneForOneBlockFetcher.java:297) > at > org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:196) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) > at > org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) > at > io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) > at > io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) > at > io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at > io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) > Caused by: java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.write0(Native Method) > at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) > at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) > at sun.nio.ch.IOUtil.write(IOUtil.java:51) > at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:470) > at > io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:408) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:949) > at > io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) > at > io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:913) > at > io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742) > at > io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728) > at > io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) > at > io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765) > 22/05/23 05:48:24 WARN org.apache.spark.scheduler.DAGScheduler: Exception > encountered when trying to finalize shuffle merge on > cluster-fast-w-0.c.altaeth-biolux.internal for shuffle 1 > java.lang.RuntimeException: java.lang.UnsupportedOperationException: > Cannot handle shuffle block merge > > This arises using the following conf: > PYSPARK_DRIVER_PYTHON=`which ipython` \ > PYSPARK_PYTHON=/custom_install/packages/anaconda/envs/biolux/bin/python \ > pyspark --master yarn \ > --deploy-mode client \ > --driver-memory 50g \ > --conf spark.executor.memory=114000m \ > --conf spark.task.cpus=1 > --conf spark.executor.cores=32 \ > --conf spark.dynamicAllocation.enabled=true \ > --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=10h \ > --num-executors 500 \ > --conf spark.task.maxFailures=30 \ > --conf spark.storage.replication.active=true \ > --conf spark.scheduler.listenerbus.eventqueue.capacity=4000000 \ > --conf spark.executor.memoryOverhead=2048m > --conf spark.stage.maxConsecutiveAttempts=1000 -\ > -conf spark.default.parallelism=10811 \ > --conf spark.sql.shuffle.partitions=10811 \ > --conf spark.sql.sources.partitionOverwriteMode="dynamic" \ > --conf spark.sql.execution.arrow.maxRecordsPerBatch=1000000 \ > --conf spark.hadoop.dfs.replication=1 \ > --conf spark.shuffle.io.numConnectionsPerPeer=5 \ > --conf spark.locality.wait=3s \ > --conf spark.shuffle.push.enabled=true \ > --conf spark.shuffle.push.maxRetainedMergerLocations=1500 \ > --conf spark.shuffle.service.enabled=true \ > --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ > --conf > spark.shuffle.push.server.mergedShuffleFileManagerImpl=org.apache.spark.network.shuffle.RemoteBlockPushResolver > \ > --conf spark.yarn.shuffle.stopOnFailure=false \ > --conf spark.shuffle.push.mergersMinThresholdRatio=0.01 \ > --conf spark.shuffle.push.mergersMinStaticThreshold=1 > > which cause the stage to be retried multiple times. Do you know if there > is something obvious that might be wrong with this setup? Thank you so much > for your time and consideration! > > Best, > Han >