The problem could be that open() is not called with a proper Configuration object in streaming mode.
On Sun, 5 Jun 2016 at 19:33 Stephan Ewen <se...@apache.org> wrote: > Hi David! > > You are using the JDBC format that was written for the batch API in the > streaming API. > > While that should actually work, it is a somewhat new and less tested > function. Let's double check that the call to open() is properly forwarded. > > > On Sun, Jun 5, 2016 at 12:47 PM, David Olsen <davidolsen4...@gmail.com> > wrote: > >> Switching to use org.apache.flink.api.java.ExecutionEnvironment, my code >> can successfully read data from database through JDBCInputFormat. But I >> need stream mode (and now it seems that the DataSet and DataStream is not >> interchangeable). Are there any additional functions required to be >> executed before StreamExecutionEnvironment creates jdbc input? >> >> Thanks >> >> >> On 5 June 2016 at 18:26, David Olsen <davidolsen4...@gmail.com> wrote: >> >>> I remove the open method when constructing jdbc input format, but I >>> still obtain "couldn't access resultSet" error. >>> >>> Caused by: java.io.IOException: Couldn't access resultSet >>> at >>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:179) >>> at >>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:51) >>> at >>> org.apache.flink.streaming.api.functions.source.FileSourceFunction.run(FileSourceFunction.java:124) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.NullPointerException >>> at >>> org.apache.flink.api.java.io.jdbc.JDBCInputFormat.nextRecord(JDBCInputFormat.java:164) >>> ... 7 more >>> >>> Anything I should check as well? >>> >>> Thanks >>> >>> >>> On 5 June 2016 at 17:26, Chesnay Schepler <ches...@apache.org> wrote: >>> >>>> you are not supposed to call open yourselves. >>>> >>>> >>>> On 05.06.2016 11:05, David Olsen wrote: >>>> >>>>> Following the sample on the flink website[1] to test jdbc I >>>>> encountered an error "Couldn't access resultSet". It looks like the >>>>> nextRecord is called before open() function. However I've called open() >>>>> when I construct jdbc input format. Any functions I should call before job >>>>> submission? >>>>> >>>>> def jdbc()= { >>>>> val jdbcif = >>>>> JDBCInputFormat.buildJDBCInputFormat.setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost/test").setQuery("select >>>>> name from department").setUsername(...).setPassword(...).finish >>>>> jdbcif.open(null) >>>>> jdbcif.asInstanceOf[JDBCInputFormat[Tuple1[String]]] >>>>> } >>>>> >>>>> def main(args: Array[String]) { >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment // -> >>>>> import >>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>>>> val evidence$6 = new TupleTypeInfo(classOf[Tuple1[String]], >>>>> STRING_TYPE_INFO) >>>>> val stream = env.createInput(jdbc(), evidence$6) >>>>> stream.map ( new MapFunction[Tuple1[String], String]() { >>>>> override def map(tuple: Tuple1[String]): String = >>>>> tuple.getField(0) >>>>> }).returns(classOf[String]).writeAsText("/path/to/jdbc") >>>>> env.execute("test-flink") >>>>> } >>>>> >>>>> The version used in this test is flink 1.0.3 and scala 2.11. >>>>> >>>>> [1]. >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/ >>>>> >>>> >>>> >>> >> >