Disclaimer: This is more of a design question. I am very new to Spark and HBase. This is going to be my first project using these 2 technologies and so far in last 2 months or so I’ve been just going over different resources to have a grasp on Spark and HBase. My question concerns mainly in terms of performance implications of using one design approach over the other. With my current knowledge I am not able to judge which of the 2 approaches outlined below is better than the other from a computation/load balancing POV. I am seeking the experts comments here if you could throw some light based on your previous usage and experience with Spark and HBase.
Background: I have some data in HBase. The data is clustered (application level clustering concept) i.e. grouped by a cluster id based on application logic. Each cluster has multiple objects. All object belonging to one cluster have the same cluster id. From HBase storage point of view the row_key itself is prefixed with the cluster_id so I can fetch all objects for a given cluster ID via a single scan. For a given cluster there is a concept of neighbor clusters which is a directional i.e. “non-symmetric” relation. Further any cluster is a neighbor to itself. Here is an example taking 4 clusters C1, C2, C3 and C4: C1’s neighbors are say {C1, C2, C3 and C4} C2’s neighbors are say {C2, C3 and C4} C3’s neighbors are say {C3} C4’s neighbor are say {C4} I need to do some cross cluster computation across neighboring clusters. Taking above example I need to process these 9 neighbor cluster pairs: (C1, C1), (C1, C2), (C1, C3), (C1, C4), (C2, C2), (C2, C3), (C2, C4), (C3, C3) (C4, C4) In pseudo code; my processing logic (as a standalone Java program) roughly looks like following: void processNeighborClusters() { List<String> allClusterIDs = getAllClusterIDs(); for (String currentClusterId : allClusterIDs) { List currentClusterObjects = loadClusterFromHBase(currentClusterId); List<String> neighborClusterIDs = getAllNeighborClusterIDs(currentClusterId); for (String neighborClusterID : neighborClusterIDs) { List neighborClusterObjects = loadClusterFromHBase(neighborClusterID); processNeighborObjects(currentClusterObjects, neighborClusterObjects); } } } void processNeighborObjects(List currentClusterObjects, List neighborClusterObjects) { for (Object obj1 : currentClusterObjects) { for (Object obj2 : neighborClusterObjects) { // do some processing involving obj1 & obj2. } } } <Thanks for reading so far, I know the background seemed quite lengthy!> My Question: I can think of doing this processing in Spark via 2 ways. Option_1: - From the driver program I create an in memory list of ordered pairs of cluster ids to process. E.g. for above example I’d have a list of 9 pairs of form (cluster_id1, cluster_id2) i.e. Pair<String, String>. - I then parallelize this to form an RDD. I did some rough calculations and the size of the list would be say few hundred thousand. - I call foreachPartition on that RDD by passing these: The HBase connection information and a method. - In that method I iterate thru all cluster id pairs pertaining to that RDD partition. At any point for a given pair (cluster_id1, cluster_id2) of cluster ids; I load the corresponding cluster data from HBase (using HBase API or Phoenix) and do the “processNeighborObjects” computation as depicted in earlier pseudo code. - Note: The clusters are by design small in size (say < 100 MB) so 2 clusters can be loaded in-memory. In short I could appropriately choose the num executors, num cores per executor, memory per executor etc. and avoid a Java OOM exception when trying to load a cluster within a task thread in an executor. - I am also thinking if I use a custom partitioner when parallelizing to make say many pairs of form (C1, A), (C1, B), (C1, C) etc. go to a single partitioner then I could use some Java/Scala caching library to minimize the need to go to HBase multiple times for C1 w.r.t. a single Spark executor POV. Option_2: - As in Option_1, In the driver program I create an in memory list of ordered pairs of neighbor cluster ids to process. E.g. for above example I’d have a list of 9 pairs of form (cluster_id1, cluster_id2). - I decide a driver side concurrency factor that I’d like to have, say N = 10 etc. This translates to a thread pool of N threads on my driver program. - Partition the list of cluster id pairs into N partitions i.e. break that uber list from step 1 into N list of lists where each sub list would have almost equal number of pairs. E.g. Taking the previous example of 9 pairs and N = 3 I’d have 3 different lists of cluster pairs: {(C1, C1), (C1, C2), (C1, C3)}, {(C1, C4), (C2, C2), (C2, C3)} and {(C2, C4), (C3, C3), (C4, C4)}. - I assign these N sub lists to be processed by the N threads of the thread pool. - Here is how a single thread would process a sub list. Iterate thru all the pairs in the sub list. For each pair (cluster_id1, cluster_id2) create 2 RDDs using newAPIHadoopRDD corresponding to cluste_id1 and cliuster_id2. Then create a new RDD which is the Cartesian product of the 2 cluster RDDs. In the Cartesian product RDD say I call foreachPartition and do the “processNeighborObjects” computation. - Here I can directly leverage Spark RDD caching/persistence support as within a sublist I’d know if same RDD say for C1 is going to be used in near future. Some comparisons: - Option_1 has a single Job from Spark POV whereas Option_2 has N concurrent jobs where N is the concurrency factor configured in the driver as a thread pool. - Option_1 directly fetches data from HBase (using HBase API/Phoenix) whereas Option_2 creates RDDs off HBase. - In Option_1 I’d have to use 3rd party caching mechanism and custom partitioner to minimize HBase reads whereas in Option_2 I can leverage Spark’s native support of caching/persistence. If you have read so far I take a bow! In an ideal world I’d have liked to do 2 separate POCs implementing both the solutions and then capturing some performance metrics. However in a “real” world there is that so called “deadline” thing and due to my no previous experience working with Spark and HBase it’d take me a while to try out both approaches. Just by going thru the explanations and based on your prior experience if you could give a pointer as to which of these might be a better approach to go ahead I’d be indebted! Thanks much, Manas -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/A-problem-involving-Spark-HBase-tp26602.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org