Jorn: Vertica 

Cody: I posited the limit just as an example of how jdbcrdd could be used least 
invasively. Let's say we used a partition on a time field -- we would still 
need to have N executions of those queries. The queries we have are very 
intense and concurrency is an issue even if the the N partitioned queries are 
smaller. Some queries require evaluating the whole data set first. If our use 
case a simple select * from table.. Then the partitions would be an easier sell 
if it wasn't for the concurrency problem :) Long story short -- we need only 
one execution of the query and would like to just divy out the result set.

M



> On Mar 1, 2015, at 5:18 AM, Jörn Franke <jornfra...@gmail.com> wrote:
> 
> What database are you using?
> 
> Le 28 févr. 2015 18:15, "Michal Klos" <michal.klo...@gmail.com> a écrit :
>> Hi Spark community,
>> 
>> We have a use case where we need to pull huge amounts of data from a SQL 
>> query against a database into Spark. We need to execute the query against 
>> our huge database and not a substitute (SparkSQL, Hive, etc) because of a 
>> couple of factors including custom functions used in the queries that only 
>> our database has.
>> 
>> We started by looking at JDBC RDD, which utilizes a prepared statement with 
>> two parameters that are meant to be used to partition the result set to the 
>> workers... e.g.:
>> 
>> select * from table limit ?,?
>> 
>> turns into
>> 
>> select * from table limit 1,100 on worker 1
>> select * from table limit 101,200 on worker 2
>> 
>> This will not work for us because our database cannot support multiple 
>> execution of these queries without being crippled. But, additionally, our 
>> database doesn't support the above LIMIT syntax and we don't have a generic 
>> way of partitioning the various queries.
>> 
>> As a result -- we stated by forking JDBCRDD and made a version that executes 
>> the SQL query once in getPartitions into a Vector and then hands each worker 
>> node an index and iterator. Here's a snippet of getPartitions and compute:
>> 
>>   override def getPartitions: Array[Partition] = {
>>     //Compute the DB query once here
>>     val results = computeQuery
>>  
>>     (0 until numPartitions).map(i => {
>>       // TODO: would be better to do this partitioning when scrolling 
>> through result set if still loading into memory
>>       val partitionItems = results.drop(i).sliding(1, 
>> numPartitions).flatten.toVector
>>       new DBPartition(i, partitionItems)
>>     }).toArray
>>   }
>> 
>>   override def compute(thePart: Partition, context: TaskContext) = new 
>> NextIterator[T] {
>>     val part = thePart.asInstanceOf[DBPartition[T]]
>>  
>>     //Shift the result vector to our index number and then do a sliding 
>> iterator over it
>>     val iterator = part.items.iterator
>>  
>>     override def getNext : T = {
>>       if (iterator.hasNext) {
>>         iterator.next()
>>       } else {
>>         finished = true
>>         null.asInstanceOf[T]
>>       }
>>     }
>>  
>>     override def close: Unit = ()
>>   }
>> 
>> This is a little better since we can just execute the query once. However, 
>> the result-set needs to fit in memory. 
>> 
>> We've been trying to brainstorm a way to 
>> 
>> A) have that result set distribute out to the worker RDD partitions as it's 
>> streaming in from the cursor?
>> B) have the result set spill to disk if it exceeds memory and do something 
>> clever around the iterators?
>> C) something else?
>> 
>> We're not familiar enough yet with all of the workings of Spark to know how 
>> to proceed on this.
>> 
>> We also thought of the worker-around of having the DB query dump to HDFS/S3 
>> and then pick it up for there, but it adds more moving parts and latency to 
>> our processing. 
>> 
>> Does anyone have a clever suggestion? Are we missing something? 
>> 
>> thanks,
>> Michal

Reply via email to