Following your suggestions I've fixed the connection reuse in my PR at https://github.com/apache/flink/pull/1885. I simply check in the establishConnection() if dbConn!=null and, in that case, I simply return immediately.
Thus, the only remaining thin to fix is the null handling. Do you have any suggestion about how to transform the results in a POJO? Maybe returning a Row and then let the user manage the conversion to the target POJO in a successive map could be a more general soloution? Best, Flavio On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <fhue...@gmail.com> wrote: > There is an InputFormat object for each parallel task of a DataSource. > So for a source with parallelism 8 you will have 8 instances of the > InputFormat running, regardless whether this is on one box with 8 slots or > 8 machines with 1 slots each. > The same is true for all other operators (Map, Reduce, Join, etc.) and > DataSinks. > > Note, a single task does not fill a slot, but a "slice" of the program (one > parallel task of each operator) fills a slot. > > Cheers, Fabian > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>: > > > ok thanks!just one last question: an inputformat is instantiated for each > > task slot or once for task manger? > > On 14 Apr 2016 18:07, "Chesnay Schepler" <ches...@apache.org> wrote: > > > > > no. > > > > > > if (connection==null) { > > > establishCOnnection(); > > > } > > > > > > done. same connection for all splits. > > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote: > > > > > >> I didn't understand what you mean for "it should also be possible to > > reuse > > >> the same connection of an InputFormat across InputSplits, i.e., calls > of > > >> the open() method". > > >> At the moment in the open method there's a call to > establishConnection, > > >> thus, a new connection is created for each split. > > >> If I understood correctly, you're suggesting to create a pool in the > > >> inputFormat and simply call poo.borrow() in the open() rather than > > >> establishConnection? > > >> > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <ches...@apache.org> wrote: > > >> > > >> On 14.04.2016 17:22, Fabian Hueske wrote: > > >>> > > >>> Hi Flavio, > > >>>> > > >>>> that are good questions. > > >>>> > > >>>> 1) Replacing null values by default values and simply forwarding > > records > > >>>> is > > >>>> very dangerous, in my opinion. > > >>>> I see two alternatives: A) we use a data type that tolerates null > > >>>> values. > > >>>> This could be a POJO that the user has to provide or Row. The > drawback > > >>>> of > > >>>> Row is that it is untyped and not easy to handle. B) We use Tuple > and > > >>>> add > > >>>> an additional field that holds an Integer which serves as a bitset > to > > >>>> mark > > >>>> null fields. This would be a pretty low level API though. I am > leaning > > >>>> towards the user-provided POJO option. > > >>>> > > >>>> i would also lean towards the POJO option. > > >>> > > >>> 2) The JDBCInputFormat is located in a dedicated Maven module. I > think > > we > > >>>> can add a dependency to that module. However, it should also be > > possible > > >>>> to > > >>>> reuse the same connection of an InputFormat across InputSplits, > i.e., > > >>>> calls > > >>>> of the open() method. Wouldn't that be sufficient? > > >>>> > > >>>> this is the right approach imo. > > >>> > > >>> Best, Fabian > > >>>> > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it > >: > > >>>> > > >>>> Hi guys, > > >>>> > > >>>>> I'm integrating the comments of Chesnay to my PR but there's a > couple > > >>>>> of > > >>>>> thing that I'd like to discuss with the core developers. > > >>>>> > > >>>>> > > >>>>> 1. about the JDBC type mapping (addValue() method at [1]: At > the > > >>>>> moment > > >>>>> if I find a null value for a Double, the getDouble of jdbc > > return > > >>>>> 0.0. > > >>>>> Is > > >>>>> it really the correct behaviour? Wouldn't be better to use a > > POJO > > >>>>> or > > >>>>> the > > >>>>> Row of datatable that can handle void? Moreover, the mapping > > >>>>> between > > >>>>> SQL > > >>>>> type and Java types varies much from the single JDBC > > >>>>> implementation. > > >>>>> Wouldn't be better to rely on the Java type coming from using > > >>>>> resultSet.getObject() to get such a mapping rather than using > > the > > >>>>> ResultSetMetadata types? > > >>>>> 2. I'd like to handle connections very efficiently because we > > >>>>> have a > > >>>>> use > > >>>>> case with billions of records and thus millions of splits and > > >>>>> establish > > >>>>> a > > >>>>> new connection each time could be expensive. Would it be a > > >>>>> problem to > > >>>>> add > > >>>>> apache pool dependency to the jdbc batch connector in order to > > >>>>> reuase > > >>>>> the > > >>>>> created connections? > > >>>>> > > >>>>> > > >>>>> [1] > > >>>>> > > >>>>> > > >>>>> > > >>>>> > > > https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java > > >>>>> > > >>>>> > > >>>>> > > > > > >