Disclaimer: This is more of a design question. I am very new to Spark and
HBase. This is going to be my first project using these 2 technologies and
so far in last 2 months or so I’ve been just going over different resources
to have a grasp on Spark and HBase. My question concerns mainly in terms of
performance implications of using one design approach over the other. With
my current knowledge I am not able to judge which of the 2 approaches
outlined below is better than the other from a computation/load balancing
POV. I am seeking the experts comments here if you could throw some light
based on your previous usage and experience with Spark and HBase.

Background:

I have some data in HBase. The data is clustered (application level
clustering concept) i.e. grouped by a cluster id based on application logic.
Each cluster has multiple objects. All object belonging to one cluster have
the same cluster id. From HBase storage point of view the row_key itself is
prefixed with the cluster_id so I can fetch all objects for a given cluster
ID via a single scan.

For a given cluster there is a concept of neighbor clusters which is a
directional i.e. “non-symmetric” relation. Further any cluster is a neighbor
to itself. Here is an example taking 4 clusters C1, C2, C3 and C4:
C1’s neighbors are say {C1, C2, C3 and C4}
C2’s neighbors are say {C2, C3 and C4}
C3’s neighbors are say {C3}
C4’s neighbor are say {C4}

I need to do some cross cluster computation across neighboring clusters.
Taking above example I need to process these 9 neighbor cluster pairs: 
(C1, C1), (C1, C2), (C1, C3), (C1, C4), 
(C2, C2), (C2, C3), (C2, C4),
(C3, C3)
(C4, C4)

In pseudo code; my processing logic (as a standalone Java program) roughly
looks like following:

void processNeighborClusters() {
    List<String> allClusterIDs = getAllClusterIDs();
    for (String currentClusterId : allClusterIDs) {
        List currentClusterObjects = loadClusterFromHBase(currentClusterId);

        List<String> neighborClusterIDs =
getAllNeighborClusterIDs(currentClusterId);
        for (String neighborClusterID : neighborClusterIDs) {
            List neighborClusterObjects =
loadClusterFromHBase(neighborClusterID);

            processNeighborObjects(currentClusterObjects,
neighborClusterObjects);
        }
    }
}

void processNeighborObjects(List currentClusterObjects, List
neighborClusterObjects) {
    for (Object obj1 : currentClusterObjects) {
        for (Object obj2 : neighborClusterObjects) {
            // do some processing involving obj1 & obj2.
        }
    }
}

<Thanks for reading so far, I know the background seemed quite lengthy!>

My Question:

I can think of doing this processing in Spark via 2 ways.

Option_1: 
-       From the driver program I create an in memory list of ordered pairs of
cluster ids to process. E.g. for above example I’d have a list of 9 pairs of
form (cluster_id1, cluster_id2) i.e. Pair<String, String>.
-       I then parallelize this to form an RDD. I did some rough calculations 
and
the size of the list would be say few hundred thousand.
-       I call foreachPartition on that RDD by passing these: The HBase 
connection
information and a method.
-       In that method I iterate thru all cluster id pairs pertaining to that 
RDD
partition. At any point for a given pair (cluster_id1, cluster_id2) of
cluster ids; I load the corresponding cluster data from HBase (using HBase
API or Phoenix) and do the “processNeighborObjects” computation as depicted
in earlier pseudo code.
-       Note: The clusters are by design small in size (say < 100 MB) so 2
clusters can be loaded in-memory. In short I could appropriately choose the
num executors, num cores per executor, memory per executor etc. and avoid a
Java OOM exception when trying to load a cluster within a task thread in an
executor.
-       I am also thinking if I use a custom partitioner when parallelizing to
make say many pairs of form (C1, A), (C1, B), (C1, C) etc. go to a single
partitioner then I could use some Java/Scala caching library to minimize the
need to go to HBase multiple times for C1 w.r.t. a single Spark executor
POV.

Option_2:
-       As in Option_1, In the driver program I create an in memory list of
ordered pairs of neighbor cluster ids to process. E.g. for above example I’d
have a list of 9 pairs of form (cluster_id1, cluster_id2). 
-       I decide a driver side concurrency factor that I’d like to have, say N =
10 etc. This translates to a thread pool of N threads on my driver program.
-       Partition the list of cluster id pairs into N partitions i.e. break that
uber list from step 1 into N list of lists where each sub list would have
almost equal number of pairs. E.g. Taking the previous example of 9 pairs
and N = 3 I’d have 3 different lists of cluster pairs: {(C1, C1), (C1, C2),
(C1, C3)}, {(C1, C4), (C2, C2), (C2, C3)} and {(C2, C4), (C3, C3), (C4,
C4)}.
-       I assign these N sub lists to be processed by the N threads of the 
thread
pool.
-       Here is how a single thread would process a sub list. Iterate thru all 
the
pairs in the sub list. For each pair (cluster_id1, cluster_id2) create 2
RDDs using newAPIHadoopRDD corresponding to cluste_id1 and cliuster_id2.
Then create a new RDD which is the Cartesian product of the 2 cluster RDDs.
In the Cartesian product RDD say I call foreachPartition and do the
“processNeighborObjects” computation.
-       Here I can directly leverage Spark RDD caching/persistence support as
within a sublist I’d know if same RDD say for C1 is going to be used in near
future.

Some comparisons:
-       Option_1 has a single Job from Spark POV whereas Option_2 has N 
concurrent
jobs where N is the concurrency factor configured in the driver as a thread
pool.
-       Option_1 directly fetches data from HBase (using HBase API/Phoenix)
whereas Option_2 creates RDDs off HBase.
-       In Option_1 I’d have to use 3rd party caching mechanism and custom
partitioner to minimize HBase reads whereas in Option_2 I can leverage
Spark’s native support of caching/persistence.

If you have read so far I take a bow! In an ideal world I’d have liked to do
2 separate POCs implementing both the solutions and then capturing some
performance metrics. However in a “real” world there is that so called
“deadline” thing and due to my no previous experience working with Spark and
HBase it’d take me a while to try out both approaches. Just by going thru
the explanations and based on your prior experience if you could give a
pointer as to which of these might be a better approach to go ahead I’d be
indebted!

Thanks much,
Manas





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/A-problem-involving-Spark-HBase-tp26602.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to