What database are you using? Le 28 févr. 2015 18:15, "Michal Klos" <michal.klo...@gmail.com> a écrit :
> Hi Spark community, > > We have a use case where we need to pull huge amounts of data from a SQL > query against a database into Spark. We need to execute the query against > our huge database and not a substitute (SparkSQL, Hive, etc) because of a > couple of factors including custom functions used in the queries that only > our database has. > > We started by looking at JDBC RDD, which utilizes a prepared statement > with two parameters that are meant to be used to partition the result set > to the workers... e.g.: > > select * from table limit ?,? > > turns into > > select * from table limit 1,100 on worker 1 > select * from table limit 101,200 on worker 2 > > This will not work for us because our database cannot support multiple > execution of these queries without being crippled. But, additionally, our > database doesn't support the above LIMIT syntax and we don't have a generic > way of partitioning the various queries. > > As a result -- we stated by forking JDBCRDD and made a version that > executes the SQL query once in getPartitions into a Vector and then hands > each worker node an index and iterator. Here's a snippet of getPartitions > and compute: > > override def getPartitions: Array[Partition] = { > //Compute the DB query once here > val results = computeQuery > > (0 until numPartitions).map(i => { > // TODO: would be better to do this partitioning when scrolling through > result set if still loading into memory > val partitionItems = results.drop(i).sliding(1, > numPartitions).flatten.toVector > new DBPartition(i, partitionItems) > }).toArray > } > > override def compute(thePart: Partition, context: TaskContext) = new > NextIterator[T] { > val part = thePart.asInstanceOf[DBPartition[T]] > > //Shift the result vector to our index number and then do a sliding > iterator over it > val iterator = part.items.iterator > > override def getNext : T = { > if (iterator.hasNext) { > iterator.next() > } else { > finished = true > null.asInstanceOf[T] > } > } > > override def close: Unit = () > } > > This is a little better since we can just execute the query once. However, > the result-set needs to fit in memory. > > We've been trying to brainstorm a way to > > A) have that result set distribute out to the worker RDD partitions as it's > streaming in from the cursor? > B) have the result set spill to disk if it exceeds memory and do something > clever around the iterators? > C) something else? > > We're not familiar enough yet with all of the workings of Spark to know how > to proceed on this. > > We also thought of the worker-around of having the DB query dump to HDFS/S3 > and then pick it up for there, but it adds more moving parts and latency to > our processing. > > Does anyone have a clever suggestion? Are we missing something? > > thanks, > Michal > >