I agree, a method to close an input format is missing. InputFormat is an API stable interface, so it is not possible to extend it (until Flink 2.0). RichInputFormat is API stable as well, but an abstract class. So it should be possible to add an empty default implementation of a closeInputFormat() method there.
Of course it would be good to re-use connections across input splits. On the other hand, input splits should not be too fine-grained as well, because input split assignment has some overhead as well. Best, Fabian 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > Yes, I forgot to mention that I could instantiate the connection in the > configure() but then I can't close it (as you confirmed) :( > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > There is also InputFormat.configure() which is called before any split > > processing happens. But I see your point about a missing close() method > > that is called after all input splits have been processed. > > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <s.bort...@gmail.com> > wrote: > > > > > Of course there is one already. We'll look into the runtime context. > > > > > > saluti, > > > Stefano > > > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > > > > > > > Being a generic JDBC input format, I would prefer to stay with Row, > > > > letting the developer manage the cast according to the driver > > > > functionalities. > > > > > > > > As for the open() and close() issue, I agree with Flavio that we'd > > need a > > > > better management of the inputformat lifecycle. Perhaps a new > interface > > > > extending it: RichInputFormat? > > > > > > > > my2c. > > > > > > > > Stefano > > > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > > > > > > > >> Talking with Stefano this morning and looking at the DataSourceTask > > code > > > >> we > > > >> discovered that the open() and close() methods are both called for > > every > > > >> split and not once per inputFormat instance (maybe open and close > > should > > > >> be > > > >> renamed as openSplit and closeSplit to avoid confusion...). > > > >> I think that it could worth to add 2 methods to the InputFormat > (e.g. > > > >> openInputFormat() and closeInputFormat() ) to allow for the > managment > > of > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a pool > > > (and > > > >> thus adding a dependency) to avoid the creation of a new connection > > > >> (expensive operation) for every split (that in our use case happens > > > >> millions of times). > > > >> > > > >> What about the output of the inputFormat? how do you want me to > > proceed? > > > >> With POJO or Row? If POJO, which strategy do you suggest? > > > >> > > > >> Best, > > > >> Flavio > > > >> > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli < > s.bort...@gmail.com > > > > > > >> wrote: > > > >> > > > >> > If we share the connection, then we should also be careful with > the > > > >> close() > > > >> > implementation. I did not see changes for this method in the PR. > > > >> > > > > >> > saluti, > > > >> > Stefano > > > >> > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier < > pomperma...@okkam.it > > >: > > > >> > > > > >> > > Following your suggestions I've fixed the connection reuse in my > > PR > > > at > > > >> > > https://github.com/apache/flink/pull/1885. > > > >> > > I simply check in the establishConnection() if dbConn!=null and, > > in > > > >> that > > > >> > > case, I simply return immediately. > > > >> > > > > > >> > > Thus, the only remaining thin to fix is the null handling. Do > you > > > have > > > >> > any > > > >> > > suggestion about how to transform the results in a POJO? > > > >> > > Maybe returning a Row and then let the user manage the > conversion > > to > > > >> the > > > >> > > target POJO in a successive map could be a more general > soloution? > > > >> > > > > > >> > > Best, > > > >> > > Flavio > > > >> > > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske < > fhue...@gmail.com > > > > > > >> > wrote: > > > >> > > > > > >> > > > There is an InputFormat object for each parallel task of a > > > >> DataSource. > > > >> > > > So for a source with parallelism 8 you will have 8 instances > of > > > the > > > >> > > > InputFormat running, regardless whether this is on one box > with > > 8 > > > >> slots > > > >> > > or > > > >> > > > 8 machines with 1 slots each. > > > >> > > > The same is true for all other operators (Map, Reduce, Join, > > etc.) > > > >> and > > > >> > > > DataSinks. > > > >> > > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of the > > > >> program > > > >> > > (one > > > >> > > > parallel task of each operator) fills a slot. > > > >> > > > > > > >> > > > Cheers, Fabian > > > >> > > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > > pomperma...@okkam.it > > > >> >: > > > >> > > > > > > >> > > > > ok thanks!just one last question: an inputformat is > > instantiated > > > >> for > > > >> > > each > > > >> > > > > task slot or once for task manger? > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" < > ches...@apache.org> > > > >> wrote: > > > >> > > > > > > > >> > > > > > no. > > > >> > > > > > > > > >> > > > > > if (connection==null) { > > > >> > > > > > establishCOnnection(); > > > >> > > > > > } > > > >> > > > > > > > > >> > > > > > done. same connection for all splits. > > > >> > > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > >> > > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also be > > > >> possible > > > >> > to > > > >> > > > > reuse > > > >> > > > > >> the same connection of an InputFormat across InputSplits, > > > i.e., > > > >> > > calls > > > >> > > > of > > > >> > > > > >> the open() method". > > > >> > > > > >> At the moment in the open method there's a call to > > > >> > > > establishConnection, > > > >> > > > > >> thus, a new connection is created for each split. > > > >> > > > > >> If I understood correctly, you're suggesting to create a > > pool > > > >> in > > > >> > the > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open() > > rather > > > >> than > > > >> > > > > >> establishConnection? > > > >> > > > > >> > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > > ches...@apache.org > > > > > > > >> > > wrote: > > > >> > > > > >> > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > >> > > > > >>> > > > >> > > > > >>> Hi Flavio, > > > >> > > > > >>>> > > > >> > > > > >>>> that are good questions. > > > >> > > > > >>>> > > > >> > > > > >>>> 1) Replacing null values by default values and simply > > > >> forwarding > > > >> > > > > records > > > >> > > > > >>>> is > > > >> > > > > >>>> very dangerous, in my opinion. > > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > > tolerates > > > >> > null > > > >> > > > > >>>> values. > > > >> > > > > >>>> This could be a POJO that the user has to provide or > Row. > > > The > > > >> > > > drawback > > > >> > > > > >>>> of > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) We > > use > > > >> > Tuple > > > >> > > > and > > > >> > > > > >>>> add > > > >> > > > > >>>> an additional field that holds an Integer which serves > > as a > > > >> > bitset > > > >> > > > to > > > >> > > > > >>>> mark > > > >> > > > > >>>> null fields. This would be a pretty low level API > > though. I > > > >> am > > > >> > > > leaning > > > >> > > > > >>>> towards the user-provided POJO option. > > > >> > > > > >>>> > > > >> > > > > >>>> i would also lean towards the POJO option. > > > >> > > > > >>> > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > > > >> module. I > > > >> > > > think > > > >> > > > > we > > > >> > > > > >>>> can add a dependency to that module. However, it should > > > also > > > >> be > > > >> > > > > possible > > > >> > > > > >>>> to > > > >> > > > > >>>> reuse the same connection of an InputFormat across > > > >> InputSplits, > > > >> > > > i.e., > > > >> > > > > >>>> calls > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > >> > > > > >>>> > > > >> > > > > >>>> this is the right approach imo. > > > >> > > > > >>> > > > >> > > > > >>> Best, Fabian > > > >> > > > > >>>> > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > >> > > pomperma...@okkam.it > > > >> > > > >: > > > >> > > > > >>>> > > > >> > > > > >>>> Hi guys, > > > >> > > > > >>>> > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > > > >> there's a > > > >> > > > couple > > > >> > > > > >>>>> of > > > >> > > > > >>>>> thing that I'd like to discuss with the core > developers. > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() method > > at > > > >> [1]: > > > >> > At > > > >> > > > the > > > >> > > > > >>>>> moment > > > >> > > > > >>>>> if I find a null value for a Double, the > getDouble > > > of > > > >> > jdbc > > > >> > > > > return > > > >> > > > > >>>>> 0.0. > > > >> > > > > >>>>> Is > > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be > better > > > to > > > >> > use a > > > >> > > > > POJO > > > >> > > > > >>>>> or > > > >> > > > > >>>>> the > > > >> > > > > >>>>> Row of datatable that can handle void? Moreover, > > the > > > >> > mapping > > > >> > > > > >>>>> between > > > >> > > > > >>>>> SQL > > > >> > > > > >>>>> type and Java types varies much from the single > > JDBC > > > >> > > > > >>>>> implementation. > > > >> > > > > >>>>> Wouldn't be better to rely on the Java type > coming > > > from > > > >> > > using > > > >> > > > > >>>>> resultSet.getObject() to get such a mapping > rather > > > than > > > >> > > using > > > >> > > > > the > > > >> > > > > >>>>> ResultSetMetadata types? > > > >> > > > > >>>>> 2. I'd like to handle connections very > efficiently > > > >> because > > > >> > > we > > > >> > > > > >>>>> have a > > > >> > > > > >>>>> use > > > >> > > > > >>>>> case with billions of records and thus millions > of > > > >> splits > > > >> > > and > > > >> > > > > >>>>> establish > > > >> > > > > >>>>> a > > > >> > > > > >>>>> new connection each time could be expensive. > Would > > it > > > >> be a > > > >> > > > > >>>>> problem to > > > >> > > > > >>>>> add > > > >> > > > > >>>>> apache pool dependency to the jdbc batch > connector > > in > > > >> > order > > > >> > > to > > > >> > > > > >>>>> reuase > > > >> > > > > >>>>> the > > > >> > > > > >>>>> created connections? > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> [1] > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > >>>>> > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > > > > > > > > > > > >