FYI: There is a new connected components implementation coming in
GraphFrames 0.3.

See: https://github.com/graphframes/graphframes/pull/119

Implementation is based on:
https://mmds-data.org/presentations/2014/vassilvitskii_mmds14.pdf

Nick

On Sat, Nov 12, 2016 at 3:01 PM Koert Kuipers <ko...@tresata.com> wrote:

> oh ok i see now its not the same
>
> On Sat, Nov 12, 2016 at 2:48 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
> not sure i see the faster algo in the paper you mention.
>
> i see this in section 6.1.2:
> "In what follows we give a simple labeling algorithm that computes
> connectivity  on  sparse  graphs  in O(log N) rounds."
> N here is the size of the graph, not the largest component diameter.
>
> that is the exact same algo as is implemented in graphx i think. or is it
> not?
>
> On Fri, Nov 11, 2016 at 7:58 PM, Daniel Darabos <
> daniel.dara...@lynxanalytics.com> wrote:
>
> Hi Shreya,
> GraphFrames just calls the GraphX strongly connected components code. (
> https://github.com/graphframes/graphframes/blob/release-0.2.0/src/main/scala/org/graphframes/lib/StronglyConnectedComponents.scala#L51
> )
>
> For choosing the number of iterations: If the number of iterations is less
> than the diameter of the graph, you may get an incorrect result. But
> running for more iterations than that buys you nothing. The algorithm is
> basically to broadcast your ID to all your neighbors in the first round,
> and then broadcast the smallest ID that you have seen so far in the next
> rounds. So with only 1 round you will get a wrong result unless each vertex
> is connected to the vertex with the lowest ID in that component. (Unlikely
> in a real graph.)
>
> See
> https://github.com/apache/spark/blob/v2.0.2/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
> for the actual implementation.
>
> A better algorithm exists for this problem that only requires O(log(N))
> iterations when N is the largest component diameter. (It is described in "A
> Model of Computation for MapReduce",
> http://www.sidsuri.com/Publications_files/mrc.pdf.) This outperforms
> GraphX's implementation immensely. (See the last slide of
> http://www.slideshare.net/SparkSummit/interactive-graph-analytics-daniel-darabos#33.)
> The large advantage is due to the lower number of necessary iterations.
>
> For why this is failing even with one iteration: I would first check your
> partitioning. Too many or too few partitions could equally cause the issue.
> If you are lucky, there is no overlap between the "too many" and "too few"
> domains :).
>
> On Fri, Nov 11, 2016 at 7:39 PM, Shreya Agarwal <shrey...@microsoft.com>
> wrote:
>
> Tried GraphFrames. Still faced the same – job died after a few hours . The
> errors I see (And I see tons of them) are –
>
> (I ran with 3 times the partitions as well, which was 12 times number of
> executors , but still the same.)
>
>
>
> -------------------------------------
>
> ERROR NativeAzureFileSystem: Encountered Storage Exception for write on
> Blob : hdp/spark2-events/application_1478717432179_0021.inprogress
> Exception details: null Error Code : RequestBodyTooLarge
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 09:21:46 ERROR TransportResponseHandler: Still have 3 requests
> outstanding when connection from /10.0.0.95:43301 is closed
>
> 16/11/11 09:21:46 INFO RetryingBlockFetcher: Retrying fetch (1/3) for 2
> outstanding blocks after 5000 ms
>
> 16/11/11 09:21:46 INFO ShuffleBlockFetcherIterator: Getting 1500 non-empty
> blocks out of 1500 blocks
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.io.IOException: Connection from /10.0.0.95:43301 closed
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 09:21:46 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /mnt/resource/hadoop/yarn/local/usercache/shreyagrssh/appcache/application_1478717432179_0021/blockmgr-b1dde30d-359e-4932-b7a4-a5e138a52360/37/shuffle_1346_21_0.index
> (No such file or directory)
>
>
>
> -------------------------------------
>
>
>
> org.apache.spark.SparkException: Exception thrown in awaitResult
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77)
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75)
>
>         at
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>
>         at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>
>         at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
>
>         at
> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: java.util.ConcurrentModificationException
>
>         at java.util.ArrayList.writeObject(ArrayList.java:766)
>
>         at sun.reflect.GeneratedMethodAccessor25.invoke(Unknown Source)
>
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
>         at java.lang.reflect.Method.invoke(Method.java:498)
>
>         at
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
>
>         at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
>
>         at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
>
>         at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
>
>
>
> -------------------------------------
>
>
>
> 16/11/11 13:21:54 WARN Executor: Issue communicating with driver in
> heartbeater
>
> org.apache.spark.SparkException: Error sending message [message =
> Heartbeat(537,[Lscala.Tuple2;@2999dae4,BlockManagerId(537, 10.0.0.103,
> 36162))]
>
>         at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:119)
>
>         at org.apache.spark.executor.Executor.org
> $apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>
>         at
> org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)
>
>         at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1857)
>
>         at
> org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)
>
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>
>         at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>         at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.spark.rpc.RpcTimeoutException: Futures timed out
> after [10 seconds]. This timeout is controlled by
> spark.executor.heartbeatInterval
>
>         at org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>
>         at
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>
>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)
>
>         at
> org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
>
>         ... 13 more
>
> Caused by: java.util.concurrent.TimeoutException: Futures timed out after
> [10 seconds]
>
>         at
> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>
>         at
> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>
>         at
> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
>
>
>
> *From:* Shreya Agarwal
> *Sent:* Thursday, November 10, 2016 8:16 PM
> *To:* 'Felix Cheung' <felixcheun...@hotmail.com>; user@spark.apache.org
> *Subject:* RE: Strongly Connected Components
>
>
>
> Yesterday’s run died sometime during the night, without any errors. Today,
> I am running it using GraphFrames instead. It is still spawning new tasks,
> so there is progress.
>
>
>
> *From:* Felix Cheung [mailto:felixcheun...@hotmail.com
> <felixcheun...@hotmail.com>]
> *Sent:* Thursday, November 10, 2016 7:50 PM
> *To:* user@spark.apache.org; Shreya Agarwal <shrey...@microsoft.com>
> *Subject:* Re: Strongly Connected Components
>
>
>
> It is possible it is dead. Could you check the Spark UI to see if there is
> any progress?
>
>
>
> _____________________________
> From: Shreya Agarwal <shrey...@microsoft.com>
> Sent: Thursday, November 10, 2016 12:45 AM
> Subject: RE: Strongly Connected Components
> To: <user@spark.apache.org>
>
>
> Bump. Anyone? Its been running for 10 hours now. No results.
>
>
>
> *From:* Shreya Agarwal
> *Sent:* Tuesday, November 8, 2016 9:05 PM
> *To:* user@spark.apache.org
> *Subject:* Strongly Connected Components
>
>
>
> Hi,
>
>
>
> I am running this on a graph with >5B edges and >3B edges and have 2
> questions –
>
>
>
>    1. What is the optimal number of iterations?
>    2. I am running it for 1 iteration right now on a beefy 100 node
>    cluster, with 300 executors each having 30GB RAM and 5 cores. I have
>    persisted the graph to MEMORY_AND_DISK. And it has been running for 3 hours
>    already. Any ideas on how to speed this up?
>
>
>
> Regards,
>
> Shreya
>
>
>
>
>
>
>

Reply via email to