Hi Fabian, I've just created a JIRA for that (FLINK-3777). As you said input split should be not too fine-grained but we have a table with 11 billions of rows that can't be queried with ranges greated than 100K of rows because it has a lot of JOIN and increasing thhis threashold implies incredibly longer response time). This implies millions of splits and, thus, millions of calls to open and thus connectiosn re-creation.. :(
On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> wrote: > I agree, a method to close an input format is missing. > InputFormat is an API stable interface, so it is not possible to extend it > (until Flink 2.0). RichInputFormat is API stable as well, but an abstract > class. So it should be possible to add an empty default implementation of a > closeInputFormat() method there. > > Of course it would be good to re-use connections across input splits. > On the other hand, input splits should not be too fine-grained as well, > because input split assignment has some overhead as well. > > Best, Fabian > > 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > > > Yes, I forgot to mention that I could instantiate the connection in the > > configure() but then I can't close it (as you confirmed) :( > > > > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > There is also InputFormat.configure() which is called before any split > > > processing happens. But I see your point about a missing close() method > > > that is called after all input splits have been processed. > > > > > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <s.bort...@gmail.com> > > wrote: > > > > > > > Of course there is one already. We'll look into the runtime context. > > > > > > > > saluti, > > > > Stefano > > > > > > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>: > > > > > > > > > Being a generic JDBC input format, I would prefer to stay with Row, > > > > > letting the developer manage the cast according to the driver > > > > > functionalities. > > > > > > > > > > As for the open() and close() issue, I agree with Flavio that we'd > > > need a > > > > > better management of the inputformat lifecycle. Perhaps a new > > interface > > > > > extending it: RichInputFormat? > > > > > > > > > > my2c. > > > > > > > > > > Stefano > > > > > > > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it > >: > > > > > > > > > >> Talking with Stefano this morning and looking at the > DataSourceTask > > > code > > > > >> we > > > > >> discovered that the open() and close() methods are both called for > > > every > > > > >> split and not once per inputFormat instance (maybe open and close > > > should > > > > >> be > > > > >> renamed as openSplit and closeSplit to avoid confusion...). > > > > >> I think that it could worth to add 2 methods to the InputFormat > > (e.g. > > > > >> openInputFormat() and closeInputFormat() ) to allow for the > > managment > > > of > > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a > pool > > > > (and > > > > >> thus adding a dependency) to avoid the creation of a new > connection > > > > >> (expensive operation) for every split (that in our use case > happens > > > > >> millions of times). > > > > >> > > > > >> What about the output of the inputFormat? how do you want me to > > > proceed? > > > > >> With POJO or Row? If POJO, which strategy do you suggest? > > > > >> > > > > >> Best, > > > > >> Flavio > > > > >> > > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli < > > s.bort...@gmail.com > > > > > > > > >> wrote: > > > > >> > > > > >> > If we share the connection, then we should also be careful with > > the > > > > >> close() > > > > >> > implementation. I did not see changes for this method in the PR. > > > > >> > > > > > >> > saluti, > > > > >> > Stefano > > > > >> > > > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier < > > pomperma...@okkam.it > > > >: > > > > >> > > > > > >> > > Following your suggestions I've fixed the connection reuse in > my > > > PR > > > > at > > > > >> > > https://github.com/apache/flink/pull/1885. > > > > >> > > I simply check in the establishConnection() if dbConn!=null > and, > > > in > > > > >> that > > > > >> > > case, I simply return immediately. > > > > >> > > > > > > >> > > Thus, the only remaining thin to fix is the null handling. Do > > you > > > > have > > > > >> > any > > > > >> > > suggestion about how to transform the results in a POJO? > > > > >> > > Maybe returning a Row and then let the user manage the > > conversion > > > to > > > > >> the > > > > >> > > target POJO in a successive map could be a more general > > soloution? > > > > >> > > > > > > >> > > Best, > > > > >> > > Flavio > > > > >> > > > > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske < > > fhue...@gmail.com > > > > > > > > >> > wrote: > > > > >> > > > > > > >> > > > There is an InputFormat object for each parallel task of a > > > > >> DataSource. > > > > >> > > > So for a source with parallelism 8 you will have 8 instances > > of > > > > the > > > > >> > > > InputFormat running, regardless whether this is on one box > > with > > > 8 > > > > >> slots > > > > >> > > or > > > > >> > > > 8 machines with 1 slots each. > > > > >> > > > The same is true for all other operators (Map, Reduce, Join, > > > etc.) > > > > >> and > > > > >> > > > DataSinks. > > > > >> > > > > > > > >> > > > Note, a single task does not fill a slot, but a "slice" of > the > > > > >> program > > > > >> > > (one > > > > >> > > > parallel task of each operator) fills a slot. > > > > >> > > > > > > > >> > > > Cheers, Fabian > > > > >> > > > > > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier < > > > > pomperma...@okkam.it > > > > >> >: > > > > >> > > > > > > > >> > > > > ok thanks!just one last question: an inputformat is > > > instantiated > > > > >> for > > > > >> > > each > > > > >> > > > > task slot or once for task manger? > > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" < > > ches...@apache.org> > > > > >> wrote: > > > > >> > > > > > > > > >> > > > > > no. > > > > >> > > > > > > > > > >> > > > > > if (connection==null) { > > > > >> > > > > > establishCOnnection(); > > > > >> > > > > > } > > > > >> > > > > > > > > > >> > > > > > done. same connection for all splits. > > > > >> > > > > > > > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > >> > > > > > > > > > >> > > > > >> I didn't understand what you mean for "it should also > be > > > > >> possible > > > > >> > to > > > > >> > > > > reuse > > > > >> > > > > >> the same connection of an InputFormat across > InputSplits, > > > > i.e., > > > > >> > > calls > > > > >> > > > of > > > > >> > > > > >> the open() method". > > > > >> > > > > >> At the moment in the open method there's a call to > > > > >> > > > establishConnection, > > > > >> > > > > >> thus, a new connection is created for each split. > > > > >> > > > > >> If I understood correctly, you're suggesting to create > a > > > pool > > > > >> in > > > > >> > the > > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open() > > > rather > > > > >> than > > > > >> > > > > >> establishConnection? > > > > >> > > > > >> > > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" < > > > ches...@apache.org > > > > > > > > > >> > > wrote: > > > > >> > > > > >> > > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > > > >> > > > > >>> > > > > >> > > > > >>> Hi Flavio, > > > > >> > > > > >>>> > > > > >> > > > > >>>> that are good questions. > > > > >> > > > > >>>> > > > > >> > > > > >>>> 1) Replacing null values by default values and simply > > > > >> forwarding > > > > >> > > > > records > > > > >> > > > > >>>> is > > > > >> > > > > >>>> very dangerous, in my opinion. > > > > >> > > > > >>>> I see two alternatives: A) we use a data type that > > > > tolerates > > > > >> > null > > > > >> > > > > >>>> values. > > > > >> > > > > >>>> This could be a POJO that the user has to provide or > > Row. > > > > The > > > > >> > > > drawback > > > > >> > > > > >>>> of > > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B) > We > > > use > > > > >> > Tuple > > > > >> > > > and > > > > >> > > > > >>>> add > > > > >> > > > > >>>> an additional field that holds an Integer which > serves > > > as a > > > > >> > bitset > > > > >> > > > to > > > > >> > > > > >>>> mark > > > > >> > > > > >>>> null fields. This would be a pretty low level API > > > though. I > > > > >> am > > > > >> > > > leaning > > > > >> > > > > >>>> towards the user-provided POJO option. > > > > >> > > > > >>>> > > > > >> > > > > >>>> i would also lean towards the POJO option. > > > > >> > > > > >>> > > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven > > > > >> module. I > > > > >> > > > think > > > > >> > > > > we > > > > >> > > > > >>>> can add a dependency to that module. However, it > should > > > > also > > > > >> be > > > > >> > > > > possible > > > > >> > > > > >>>> to > > > > >> > > > > >>>> reuse the same connection of an InputFormat across > > > > >> InputSplits, > > > > >> > > > i.e., > > > > >> > > > > >>>> calls > > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient? > > > > >> > > > > >>>> > > > > >> > > > > >>>> this is the right approach imo. > > > > >> > > > > >>> > > > > >> > > > > >>> Best, Fabian > > > > >> > > > > >>>> > > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier < > > > > >> > > pomperma...@okkam.it > > > > >> > > > >: > > > > >> > > > > >>>> > > > > >> > > > > >>>> Hi guys, > > > > >> > > > > >>>> > > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but > > > > >> there's a > > > > >> > > > couple > > > > >> > > > > >>>>> of > > > > >> > > > > >>>>> thing that I'd like to discuss with the core > > developers. > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> 1. about the JDBC type mapping (addValue() > method > > > at > > > > >> [1]: > > > > >> > At > > > > >> > > > the > > > > >> > > > > >>>>> moment > > > > >> > > > > >>>>> if I find a null value for a Double, the > > getDouble > > > > of > > > > >> > jdbc > > > > >> > > > > return > > > > >> > > > > >>>>> 0.0. > > > > >> > > > > >>>>> Is > > > > >> > > > > >>>>> it really the correct behaviour? Wouldn't be > > better > > > > to > > > > >> > use a > > > > >> > > > > POJO > > > > >> > > > > >>>>> or > > > > >> > > > > >>>>> the > > > > >> > > > > >>>>> Row of datatable that can handle void? > Moreover, > > > the > > > > >> > mapping > > > > >> > > > > >>>>> between > > > > >> > > > > >>>>> SQL > > > > >> > > > > >>>>> type and Java types varies much from the single > > > JDBC > > > > >> > > > > >>>>> implementation. > > > > >> > > > > >>>>> Wouldn't be better to rely on the Java type > > coming > > > > from > > > > >> > > using > > > > >> > > > > >>>>> resultSet.getObject() to get such a mapping > > rather > > > > than > > > > >> > > using > > > > >> > > > > the > > > > >> > > > > >>>>> ResultSetMetadata types? > > > > >> > > > > >>>>> 2. I'd like to handle connections very > > efficiently > > > > >> because > > > > >> > > we > > > > >> > > > > >>>>> have a > > > > >> > > > > >>>>> use > > > > >> > > > > >>>>> case with billions of records and thus millions > > of > > > > >> splits > > > > >> > > and > > > > >> > > > > >>>>> establish > > > > >> > > > > >>>>> a > > > > >> > > > > >>>>> new connection each time could be expensive. > > Would > > > it > > > > >> be a > > > > >> > > > > >>>>> problem to > > > > >> > > > > >>>>> add > > > > >> > > > > >>>>> apache pool dependency to the jdbc batch > > connector > > > in > > > > >> > order > > > > >> > > to > > > > >> > > > > >>>>> reuase > > > > >> > > > > >>>>> the > > > > >> > > > > >>>>> created connections? > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> [1] > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > >>>>> > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > >