Jorn: Vertica Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set.
M > On Mar 1, 2015, at 5:18 AM, Jörn Franke <jornfra...@gmail.com> wrote: > > What database are you using? > > Le 28 févr. 2015 18:15, "Michal Klos" <michal.klo...@gmail.com> a écrit : >> Hi Spark community, >> >> We have a use case where we need to pull huge amounts of data from a SQL >> query against a database into Spark. We need to execute the query against >> our huge database and not a substitute (SparkSQL, Hive, etc) because of a >> couple of factors including custom functions used in the queries that only >> our database has. >> >> We started by looking at JDBC RDD, which utilizes a prepared statement with >> two parameters that are meant to be used to partition the result set to the >> workers... e.g.: >> >> select * from table limit ?,? >> >> turns into >> >> select * from table limit 1,100 on worker 1 >> select * from table limit 101,200 on worker 2 >> >> This will not work for us because our database cannot support multiple >> execution of these queries without being crippled. But, additionally, our >> database doesn't support the above LIMIT syntax and we don't have a generic >> way of partitioning the various queries. >> >> As a result -- we stated by forking JDBCRDD and made a version that executes >> the SQL query once in getPartitions into a Vector and then hands each worker >> node an index and iterator. Here's a snippet of getPartitions and compute: >> >> override def getPartitions: Array[Partition] = { >> //Compute the DB query once here >> val results = computeQuery >> >> (0 until numPartitions).map(i => { >> // TODO: would be better to do this partitioning when scrolling >> through result set if still loading into memory >> val partitionItems = results.drop(i).sliding(1, >> numPartitions).flatten.toVector >> new DBPartition(i, partitionItems) >> }).toArray >> } >> >> override def compute(thePart: Partition, context: TaskContext) = new >> NextIterator[T] { >> val part = thePart.asInstanceOf[DBPartition[T]] >> >> //Shift the result vector to our index number and then do a sliding >> iterator over it >> val iterator = part.items.iterator >> >> override def getNext : T = { >> if (iterator.hasNext) { >> iterator.next() >> } else { >> finished = true >> null.asInstanceOf[T] >> } >> } >> >> override def close: Unit = () >> } >> >> This is a little better since we can just execute the query once. However, >> the result-set needs to fit in memory. >> >> We've been trying to brainstorm a way to >> >> A) have that result set distribute out to the worker RDD partitions as it's >> streaming in from the cursor? >> B) have the result set spill to disk if it exceeds memory and do something >> clever around the iterators? >> C) something else? >> >> We're not familiar enough yet with all of the workings of Spark to know how >> to proceed on this. >> >> We also thought of the worker-around of having the DB query dump to HDFS/S3 >> and then pick it up for there, but it adds more moving parts and latency to >> our processing. >> >> Does anyone have a clever suggestion? Are we missing something? >> >> thanks, >> Michal