Of course there is one already. We'll look into the runtime context. saluti, Stefano
2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > Being a generic JDBC input format, I would prefer to stay with Row, > letting the developer manage the cast according to the driver > functionalities. > > As for the open() and close() issue, I agree with Flavio that we'd need a > better management of the inputformat lifecycle. Perhaps a new interface > extending it: RichInputFormat? > > my2c. > > Stefano > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > >> Talking with Stefano this morning and looking at the DataSourceTask code >> we >> discovered that the open() and close() methods are both called for every >> split and not once per inputFormat instance (maybe open and close should >> be >> renamed as openSplit and closeSplit to avoid confusion...). >> I think that it could worth to add 2 methods to the InputFormat (e.g. >> openInputFormat() and closeInputFormat() ) to allow for the managment of >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and >> thus adding a dependency) to avoid the creation of a new connection >> (expensive operation) for every split (that in our use case happens >> millions of times). >> >> What about the output of the inputFormat? how do you want me to proceed? >> With POJO or Row? If POJO, which strategy do you suggest? >> >> Best, >> Flavio >> >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <s.bort...@gmail.com> >> wrote: >> >> > If we share the connection, then we should also be careful with the >> close() >> > implementation. I did not see changes for this method in the PR. >> > >> > saluti, >> > Stefano >> > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: >> > >> > > Following your suggestions I've fixed the connection reuse in my PR at >> > > https://github.com/apache/flink/pull/1885. >> > > I simply check in the establishConnection() if dbConn!=null and, in >> that >> > > case, I simply return immediately. >> > > >> > > Thus, the only remaining thin to fix is the null handling. Do you have >> > any >> > > suggestion about how to transform the results in a POJO? >> > > Maybe returning a Row and then let the user manage the conversion to >> the >> > > target POJO in a successive map could be a more general soloution? >> > > >> > > Best, >> > > Flavio >> > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <fhue...@gmail.com> >> > wrote: >> > > >> > > > There is an InputFormat object for each parallel task of a >> DataSource. >> > > > So for a source with parallelism 8 you will have 8 instances of the >> > > > InputFormat running, regardless whether this is on one box with 8 >> slots >> > > or >> > > > 8 machines with 1 slots each. >> > > > The same is true for all other operators (Map, Reduce, Join, etc.) >> and >> > > > DataSinks. >> > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the >> program >> > > (one >> > > > parallel task of each operator) fills a slot. >> > > > >> > > > Cheers, Fabian >> > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it >> >: >> > > > >> > > > > ok thanks!just one last question: an inputformat is instantiated >> for >> > > each >> > > > > task slot or once for task manger? >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <ches...@apache.org> >> wrote: >> > > > > >> > > > > > no. >> > > > > > >> > > > > > if (connection==null) { >> > > > > > establishCOnnection(); >> > > > > > } >> > > > > > >> > > > > > done. same connection for all splits. >> > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: >> > > > > > >> > > > > >> I didn't understand what you mean for "it should also be >> possible >> > to >> > > > > reuse >> > > > > >> the same connection of an InputFormat across InputSplits, i.e., >> > > calls >> > > > of >> > > > > >> the open() method". >> > > > > >> At the moment in the open method there's a call to >> > > > establishConnection, >> > > > > >> thus, a new connection is created for each split. >> > > > > >> If I understood correctly, you're suggesting to create a pool >> in >> > the >> > > > > >> inputFormat and simply call poo.borrow() in the open() rather >> than >> > > > > >> establishConnection? >> > > > > >> >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <ches...@apache.org> >> > > wrote: >> > > > > >> >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: >> > > > > >>> >> > > > > >>> Hi Flavio, >> > > > > >>>> >> > > > > >>>> that are good questions. >> > > > > >>>> >> > > > > >>>> 1) Replacing null values by default values and simply >> forwarding >> > > > > records >> > > > > >>>> is >> > > > > >>>> very dangerous, in my opinion. >> > > > > >>>> I see two alternatives: A) we use a data type that tolerates >> > null >> > > > > >>>> values. >> > > > > >>>> This could be a POJO that the user has to provide or Row. The >> > > > drawback >> > > > > >>>> of >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use >> > Tuple >> > > > and >> > > > > >>>> add >> > > > > >>>> an additional field that holds an Integer which serves as a >> > bitset >> > > > to >> > > > > >>>> mark >> > > > > >>>> null fields. This would be a pretty low level API though. I >> am >> > > > leaning >> > > > > >>>> towards the user-provided POJO option. >> > > > > >>>> >> > > > > >>>> i would also lean towards the POJO option. >> > > > > >>> >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven >> module. I >> > > > think >> > > > > we >> > > > > >>>> can add a dependency to that module. However, it should also >> be >> > > > > possible >> > > > > >>>> to >> > > > > >>>> reuse the same connection of an InputFormat across >> InputSplits, >> > > > i.e., >> > > > > >>>> calls >> > > > > >>>> of the open() method. Wouldn't that be sufficient? >> > > > > >>>> >> > > > > >>>> this is the right approach imo. >> > > > > >>> >> > > > > >>> Best, Fabian >> > > > > >>>> >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < >> > > pomperma...@okkam.it >> > > > >: >> > > > > >>>> >> > > > > >>>> Hi guys, >> > > > > >>>> >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but >> there's a >> > > > couple >> > > > > >>>>> of >> > > > > >>>>> thing that I'd like to discuss with the core developers. >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method at >> [1]: >> > At >> > > > the >> > > > > >>>>> moment >> > > > > >>>>> if I find a null value for a Double, the getDouble of >> > jdbc >> > > > > return >> > > > > >>>>> 0.0. >> > > > > >>>>> Is >> > > > > >>>>> it really the correct behaviour? Wouldn't be better to >> > use a >> > > > > POJO >> > > > > >>>>> or >> > > > > >>>>> the >> > > > > >>>>> Row of datatable that can handle void? Moreover, the >> > mapping >> > > > > >>>>> between >> > > > > >>>>> SQL >> > > > > >>>>> type and Java types varies much from the single JDBC >> > > > > >>>>> implementation. >> > > > > >>>>> Wouldn't be better to rely on the Java type coming from >> > > using >> > > > > >>>>> resultSet.getObject() to get such a mapping rather than >> > > using >> > > > > the >> > > > > >>>>> ResultSetMetadata types? >> > > > > >>>>> 2. I'd like to handle connections very efficiently >> because >> > > we >> > > > > >>>>> have a >> > > > > >>>>> use >> > > > > >>>>> case with billions of records and thus millions of >> splits >> > > and >> > > > > >>>>> establish >> > > > > >>>>> a >> > > > > >>>>> new connection each time could be expensive. Would it >> be a >> > > > > >>>>> problem to >> > > > > >>>>> add >> > > > > >>>>> apache pool dependency to the jdbc batch connector in >> > order >> > > to >> > > > > >>>>> reuase >> > > > > >>>>> the >> > > > > >>>>> created connections? >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> [1] >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > >> > > > >> > > >> > >> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java >> > > > > >>>>> >> > > > > >>>>> >> > > > > >>>>> >> > > > > > >> > > > > >> > > > >> > > >> > >> > >