Cody, you were right, I had a copy and paste snag where I ended up with a
vanilla SparkContext rather than a Java one.  I also had to *not* use my
function subclasses, rather just use anonymous inner classes for the
Function stuff and that got things working. I'm fully following
the JdbcRDD.create approach from JavaJdbcRDDSuite.java basically verbatim.

Is there a clean way to refactor out the custom Function classes such as
the one for getting a db connection or mapping ResultSet data to your own
POJO's rather than doing it all inline?


On Wed, Feb 18, 2015 at 1:52 PM, Cody Koeninger <[email protected]> wrote:

> Is sc there a SparkContext or a JavaSparkContext?  The compilation error
> seems to indicate the former, but JdbcRDD.create expects the latter
>
> On Wed, Feb 18, 2015 at 12:30 PM, Dmitry Goldenberg <
> [email protected]> wrote:
>
>> I have tried that as well, I get a compile error --
>>
>> [ERROR] ...SparkProto.java:[105,39] error: no suitable method found for
>> create(SparkContext,<anonymous
>> ConnectionFactory>,String,int,int,int,<anonymous
>> Function<ResultSet,Integer>>)
>>
>> The code is a copy and paste:
>>
>>     JavaRDD<Integer> jdbcRDD = JdbcRDD.create(
>>           sc,
>>           new JdbcRDD.ConnectionFactory() {
>>             public Connection getConnection() throws SQLException {
>>               return
>> DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb");
>>             }
>>           },
>>           "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
>>           1, 100, 1,
>>           new Function<ResultSet, Integer>() {
>>             public Integer call(ResultSet r) throws Exception {
>>               return r.getInt(1);
>>             }
>>           }
>>         );
>>
>> The other thing I've tried was to define a static class locally for
>> GetConnection and use the JdbcCreate constructor. This got around the
>> compile issues but blew up at runtime with "NoClassDefFoundError:
>> scala/runtime/AbstractFunction0" !
>>
>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>> sc,
>> (AbstractFunction0<Connection>) new DbConn(), // had to cast or a compile
>> error
>> SQL_QUERY,
>> 0L,
>> 1000L,
>> 10,
>> new MapRow(),
>> ROW_CLASS_TAG);
>> // DbConn is defined as public static class DbConn extends
>> AbstractFunction0<Connection> implements Serializable
>>
>> On Wed, Feb 18, 2015 at 1:20 PM, Cody Koeninger <[email protected]>
>> wrote:
>>
>>> That test I linked
>>>
>>>
>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java#L90
>>>
>>> is calling a static method JdbcRDD.create, not new JdbcRDD.  Is that
>>> what you tried doing?
>>>
>>> On Wed, Feb 18, 2015 at 12:00 PM, Dmitry Goldenberg <
>>> [email protected]> wrote:
>>>
>>>> Thanks, Cody. Yes, I originally started off by looking at that but I
>>>> get a compile error if I try and use that approach: constructor JdbcRDD in
>>>> class JdbcRDD<T> cannot be applied to given types.  Not to mention that
>>>> JavaJdbcRDDSuite somehow manages to not pass in the class tag (the last
>>>> argument).
>>>>
>>>> Wonder if it's a JDK version issue, I'm using 1.7.
>>>>
>>>> So I've got this, which doesn't compile
>>>>
>>>> JdbcRDD<Row> jdbcRDD = new JdbcRDD<Row>(
>>>> new SparkContext(conf),
>>>> new JdbcRDD.ConnectionFactory() {
>>>> public Connection getConnection() throws SQLException {
>>>> Connection conn = null;
>>>> try {
>>>> Class.forName(JDBC_DRIVER);
>>>> conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASSWORD);
>>>> } catch (ClassNotFoundException ex) {
>>>> throw new RuntimeException("Error while loading JDBC driver.", ex);
>>>> }
>>>> return conn;
>>>> }
>>>> },
>>>> "SELECT * FROM EMPLOYEES",
>>>> 0L,
>>>> 1000L,
>>>> 10,
>>>> new Function<ResultSet, Row>() {
>>>> public Row call(ResultSet r) throws Exception {
>>>> return null; // have some actual logic here...
>>>> }
>>>> },
>>>> scala.reflect.ClassManifestFactory$.MODULE$.fromClass(Row.class));
>>>>
>>>> The other approach was mimicing the DbConnection class from this post:
>>>> http://www.sparkexpert.com/2015/01/02/load-database-data-into-spark-using-jdbcrdd-in-java/.
>>>> It got around any of the compilation issues but then I got the runtime
>>>> error where Spark wouldn't recognize the db connection class as a
>>>> scala.Function0.
>>>>
>>>>
>>>>
>>>> On Wed, Feb 18, 2015 at 12:37 PM, Cody Koeninger <[email protected]>
>>>> wrote:
>>>>
>>>>> Take a look at
>>>>>
>>>>>
>>>>> https://github.com/apache/spark/blob/v1.2.1/core/src/test/java/org/apache/spark/JavaJdbcRDDSuite.java
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Feb 18, 2015 at 11:14 AM, dgoldenberg <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> I'm reading data from a database using JdbcRDD, in Java, and I have an
>>>>>> implementation of Function0<Connection> whose instance I supply as the
>>>>>> 'getConnection' parameter into the JdbcRDD constructor. Compiles fine.
>>>>>>
>>>>>> The definition of the class/function is as follows:
>>>>>>
>>>>>>   public class GetDbConnection extends AbstractFunction0<Connection>
>>>>>> implements Serializable
>>>>>>
>>>>>> where scala.runtime.AbstractFunction0 extends scala.Function0.
>>>>>>
>>>>>> At runtime, I get an exception as below. Does anyone have an idea as
>>>>>> to how
>>>>>> to resolve this/work around it? Thanks.
>>>>>>
>>>>>> I'm running Spark 1.2.1 built for Hadoop 2.4.
>>>>>>
>>>>>>
>>>>>> Exception in thread "main" org.apache.spark.SparkException: Job
>>>>>> aborted due
>>>>>> to stage failure: Task 3 in stage 0.0 failed 1 times, most recent
>>>>>> failure:
>>>>>> Lost task 3.0 in stage 0.0 (TID 3, localhost):
>>>>>> java.lang.ClassCastException:
>>>>>> cannot assign instance of
>>>>>> com.kona.motivis.spark.proto.GetDbConnection to
>>>>>> field
>>>>>> org.apache.spark.rdd.JdbcRDD.org$apache$spark$rdd$JdbcRDD$$getConnection
>>>>>> of
>>>>>> type scala.Function0 in instance of org.apache.spark.rdd.JdbcRDD
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2083)
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1261)
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1996)
>>>>>>         at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>         at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>>>>>>         at
>>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>>>>>         at
>>>>>>
>>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>>         at
>>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>>         at
>>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
>>>>>>         at
>>>>>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:57)
>>>>>>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>>>>>>         at
>>>>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>         at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>         at java.lang.Thread.run(Thread.java:744)
>>>>>>
>>>>>> Driver stacktrace:
>>>>>>         at
>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>>>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>>>>         at
>>>>>>
>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>         at
>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>>>>         at scala.Option.foreach(Option.scala:236)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>>         at
>>>>>>
>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>>>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>>>>>         at
>>>>>>
>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>>>         at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>         at
>>>>>>
>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>         at
>>>>>>
>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>         at
>>>>>>
>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/JdbcRDD-ClassCastException-with-scala-Function0-tp21707.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: [email protected]
>>>>>> For additional commands, e-mail: [email protected]
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to