FYI: There is a new connected components implementation coming in GraphFrames 0.3.
See: https://github.com/graphframes/graphframes/pull/119 Implementation is based on: https://mmds-data.org/presentations/2014/vassilvitskii_mmds14.pdf Nick On Sat, Nov 12, 2016 at 3:01 PM Koert Kuipers <ko...@tresata.com> wrote: > oh ok i see now its not the same > > On Sat, Nov 12, 2016 at 2:48 PM, Koert Kuipers <ko...@tresata.com> wrote: > > not sure i see the faster algo in the paper you mention. > > i see this in section 6.1.2: > "In what follows we give a simple labeling algorithm that computes > connectivity on sparse graphs in O(log N) rounds." > N here is the size of the graph, not the largest component diameter. > > that is the exact same algo as is implemented in graphx i think. or is it > not? > > On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos < > daniel.dara...@lynxanalytics.com> wrote: > > Hi Shreya, > GraphFrames just calls the GraphX strongly connected components code. ( > https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51 > ) > > For choosing the number of iterations: If the number of iterations is less > than the diameter of the graph, you may get an incorrect result. But > running for more iterations than that buys you nothing. The algorithm is > basically to broadcast your ID to all your neighbors in the first round, > and then broadcast the smallest ID that you have seen so far in the next > rounds. So with only 1 round you will get a wrong result unless each vertex > is connected to the vertex with the lowest ID in that component. (Unlikely > in a real graph.) > > See > https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala > for the actual implementation. > > A better algorithm exists for this problem that only requires O(log(N)) > iterations when N is the largest component diameter. (It is described in "A > Model of Computation for MapReduce", > http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms > GraphX's implementation immensely. (See the last slide of > http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.) > The large advantage is due to the lower number of necessary iterations. > > For why this is failing even with one iteration: I would first check your > partitioning. Too many or too few partitions could equally cause the issue. > If you are lucky, there is no overlap between the "too many" and "too few" > domains :). > > On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com> > wrote: > > Tried GraphFrames. Still faced the same – job died after a few hours . The > errors I see (And I see tons of them) are – > > (I ran with 3 times the partitions as well, which was 12 times number of > executors , but still the same.) > > > > ------------------------------------- > > ERROR NativeAzureFileSystem: Encountered Storage Exception for write on > Blob : hdp/spark2-events/application_1478717432179_0021.inprogress > Exception details: null Error Code : RequestBodyTooLarge > > > > ------------------------------------- > > > > 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests > outstanding when connection from /10.0.0.95:43301 is closed > > 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2 > outstanding blocks after 5000 ms > > 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty > blocks out of 1500 blocks > > 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block > fetches > > java.io.IOException: Connection from /10.0.0.95:43301 closed > > > > ------------------------------------- > > > > 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block > fetches > > java.lang.RuntimeException: java.io.FileNotFoundException: > /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index > (No such file or directory) > > > > ------------------------------------- > > > > org.apache.spark.SparkException: Exception thrown in awaitResult > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) > > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > > at org.apache.spark.executor.Executor.org > $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) > > at > org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: java.util.ConcurrentModificationException > > at java.util.ArrayList.writeObject(ArrayList.java:766) > > at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:498) > > at > java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) > > at > java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) > > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > > at > java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > > > > ------------------------------------- > > > > 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in > heartbeater > > org.apache.spark.SparkException: Error sending message [message = > Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103, > 36162))] > > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119) > > at org.apache.spark.executor.Executor.org > $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > > at > org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547) > > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857) > > at > org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out > after [10 seconds]. This timeout is controlled by > spark.executor.heartbeatInterval > > at org.apache.spark.rpc.RpcTimeout.org > $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) > > at > org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) > > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) > > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) > > at > org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) > > ... 13 more > > Caused by: java.util.concurrent.TimeoutException: Futures timed out after > [10 seconds] > > at > scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) > > at > scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) > > at > scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190) > > > > *From:* Shreya Agarwal > *Sent:* Thursday, November 10, 2016 8:16 PM > *To:* 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org > *Subject:* RE: Strongly Connected Components > > > > Yesterday’s run died sometime during the night, without any errors. Today, > I am running it using GraphFrames instead. It is still spawning new tasks, > so there is progress. > > > > *From:* Felix Cheung [mailto:felixcheun...@hotmail.com > <felixcheun...@hotmail.com>] > *Sent:* Thursday, November 10, 2016 7:50 PM > *To:* user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com> > *Subject:* Re: Strongly Connected Components > > > > It is possible it is dead. Could you check the Spark UI to see if there is > any progress? > > > > _____________________________ > From: Shreya Agarwal <shrey...@microsoft.com> > Sent: Thursday, November 10, 2016 12:45 AM > Subject: RE: Strongly Connected Components > To: <user@spark.apache.org> > > > Bump. Anyone? Its been running for 10 hours now. No results. > > > > *From:* Shreya Agarwal > *Sent:* Tuesday, November 8, 2016 9:05 PM > *To:* user@spark.apache.org > *Subject:* Strongly Connected Components > > > > Hi, > > > > I am running this on a graph with >5B edges and >3B edges and have 2 > questions – > > > > 1. What is the optimal number of iterations? > 2. I am running it for 1 iteration right now on a beefy 100 node > cluster, with 300 executors each having 30GB RAM and 5 cores. I have > persisted the graph to MEMORY_AND_DISK. And it has been running for 3 hours > already. Any ideas on how to speed this up? > > > > Regards, > > Shreya > > > > > > >