What you're saying is that, due to the intensity of the query, you need to run a single query and partition the results, versus running one query for each partition.

I assume it's not viable to throw the query results into another table in your database and then query that using the normal approach?

--eric

On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote:
Jorn: Vertica

Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set.

M



On Mar 1, 2015, at 5:18 AM, Jörn Franke <jornfra...@gmail.com <mailto:jornfra...@gmail.com>> wrote:

What database are you using?

Le 28 févr. 2015 18:15, "Michal Klos" <michal.klo...@gmail.com <mailto:michal.klo...@gmail.com>> a écrit :

    Hi Spark community,

    We have a use case where we need to pull huge amounts of data
    from a SQL query against a database into Spark. We need to
    execute the query against our huge database and not a substitute
    (SparkSQL, Hive, etc) because of a couple of factors including
    custom functions used in the queries that only our database has.

    We started by looking at JDBC RDD, which utilizes a prepared
    statement with two parameters that are meant to be used to
    partition the result set to the workers... e.g.:

    select * from table limit ?,?

    turns into

    select * from table limit 1,100 on worker 1
    select * from table limit 101,200 on worker 2

    This will not work for us because our database cannot support
    multiple execution of these queries without being crippled. But,
    additionally, our database doesn't support the above LIMIT syntax
    and we don't have a generic way of partitioning the various queries.

    As a result -- we stated by forking JDBCRDD and made a version
    that executes the SQL query once in getPartitions into a Vector
    and then hands each worker node an index and iterator. Here's a
    snippet of getPartitions and compute:

    override def getPartitions: Array[Partition] = {
    //Compute the DB query once here
    val results = computeQuery
    (0 until numPartitions).map(i => {
    // TODO: would be better to do this partitioning when scrolling
    through result set if still loading into memory
    val partitionItems = results.drop(i).sliding(1,
    numPartitions).flatten.toVector
    new DBPartition(i, partitionItems)
    }).toArray
    }

    override def compute(thePart: Partition, context: TaskContext) =
    new NextIterator[T] {
    val part = thePart.asInstanceOf[DBPartition[T]]
    //Shift the result vector to our index number and then do a
    sliding iterator over it
    val iterator = part.items.iterator
    override def getNext : T = {
    if (iterator.hasNext) {
    iterator.next()
    } else {
    finished = true
    null.asInstanceOf[T]
    }
    }
    override def close: Unit = ()
    }
    This is a little better since we can just execute the query once.
    However, the result-set needs to fit in memory.
    We've been trying to brainstorm a way to
    A) have that result set distribute out to the worker RDD
    partitions as it's streaming in from the cursor?
    B) have the result set spill to disk if it exceeds memory and do
    something clever around the iterators?
    C) something else?
    We're not familiar enough yet with all of the workings of Spark
    to know how to proceed on this.
    We also thought of the worker-around of having the DB query dump
    to HDFS/S3 and then pick it up for there, but it adds more moving
    parts and latency to our processing.
    Does anyone have a clever suggestion? Are we missing something?
    thanks,
    Michal


Reply via email to