Of course there is one already. We'll look into the runtime context.

saluti,
Stefano

2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:

> Being a generic JDBC input format, I would prefer to stay with Row,
> letting the developer manage the cast according to the driver
> functionalities.
>
> As for the open() and close() issue, I agree with Flavio that we'd need a
> better management of the inputformat lifecycle. Perhaps a new interface
> extending it: RichInputFormat?
>
> my2c.
>
> Stefano
>
> 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
>> Talking with Stefano this morning and looking at the DataSourceTask code
>> we
>> discovered that the open() and close() methods are both called for every
>> split and not once per inputFormat instance (maybe open and close should
>> be
>> renamed as openSplit and closeSplit to avoid confusion...).
>> I think that it could worth to add 2 methods to the InputFormat (e.g.
>> openInputFormat() and closeInputFormat() ) to allow for the managment of
>> the InputFormat lifecycle, otherwise I'll need to instantiate a pool (and
>> thus adding a dependency) to avoid the creation of a new connection
>> (expensive operation) for every split (that in our use case happens
>> millions of times).
>>
>> What about the output of the inputFormat? how do you want me to proceed?
>> With POJO or Row? If POJO, which strategy do you suggest?
>>
>> Best,
>> Flavio
>>
>> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <s.bort...@gmail.com>
>> wrote:
>>
>> > If we share the connection, then we should also be careful with the
>> close()
>> > implementation. I did not see changes for this method in the PR.
>> >
>> > saluti,
>> > Stefano
>> >
>> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>> >
>> > > Following your suggestions I've fixed the connection reuse in my PR at
>> > > https://github.com/apache/flink/pull/1885.
>> > > I simply check in the establishConnection() if dbConn!=null and, in
>> that
>> > > case, I simply return immediately.
>> > >
>> > > Thus, the only remaining thin to fix is the null handling. Do you have
>> > any
>> > > suggestion about how to transform the results in a POJO?
>> > > Maybe returning a Row and then let the user manage the conversion to
>> the
>> > > target POJO in a successive map could be a more general soloution?
>> > >
>> > > Best,
>> > > Flavio
>> > >
>> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <fhue...@gmail.com>
>> > wrote:
>> > >
>> > > > There is an InputFormat object for each parallel task of a
>> DataSource.
>> > > > So for a source with parallelism 8 you will have 8 instances of the
>> > > > InputFormat running, regardless whether this is on one box with 8
>> slots
>> > > or
>> > > > 8 machines with 1 slots each.
>> > > > The same is true for all other operators (Map, Reduce, Join, etc.)
>> and
>> > > > DataSinks.
>> > > >
>> > > > Note, a single task does not fill a slot, but a "slice" of the
>> program
>> > > (one
>> > > > parallel task of each operator) fills a slot.
>> > > >
>> > > > Cheers, Fabian
>> > > >
>> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
>> >:
>> > > >
>> > > > > ok thanks!just one last question: an inputformat is instantiated
>> for
>> > > each
>> > > > > task slot or once for task manger?
>> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <ches...@apache.org>
>> wrote:
>> > > > >
>> > > > > > no.
>> > > > > >
>> > > > > > if (connection==null) {
>> > > > > >  establishCOnnection();
>> > > > > > }
>> > > > > >
>> > > > > > done. same connection for all splits.
>> > > > > >
>> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
>> > > > > >
>> > > > > >> I didn't understand what you mean for "it should also be
>> possible
>> > to
>> > > > > reuse
>> > > > > >> the same connection of an InputFormat across InputSplits, i.e.,
>> > > calls
>> > > > of
>> > > > > >> the open() method".
>> > > > > >> At the moment in the open method there's a call to
>> > > > establishConnection,
>> > > > > >> thus, a new connection is created for each split.
>> > > > > >> If I understood correctly, you're suggesting to create a pool
>> in
>> > the
>> > > > > >> inputFormat and simply call poo.borrow() in the open() rather
>> than
>> > > > > >> establishConnection?
>> > > > > >>
>> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <ches...@apache.org>
>> > > wrote:
>> > > > > >>
>> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
>> > > > > >>>
>> > > > > >>> Hi Flavio,
>> > > > > >>>>
>> > > > > >>>> that are good questions.
>> > > > > >>>>
>> > > > > >>>> 1) Replacing null values by default values and simply
>> forwarding
>> > > > > records
>> > > > > >>>> is
>> > > > > >>>> very dangerous, in my opinion.
>> > > > > >>>> I see two alternatives: A) we use a data type that tolerates
>> > null
>> > > > > >>>> values.
>> > > > > >>>> This could be a POJO that the user has to provide or Row. The
>> > > > drawback
>> > > > > >>>> of
>> > > > > >>>> Row is that it is untyped and not easy to handle. B) We use
>> > Tuple
>> > > > and
>> > > > > >>>> add
>> > > > > >>>> an additional field that holds an Integer which serves as a
>> > bitset
>> > > > to
>> > > > > >>>> mark
>> > > > > >>>> null fields. This would be a pretty low level API though. I
>> am
>> > > > leaning
>> > > > > >>>> towards the user-provided POJO option.
>> > > > > >>>>
>> > > > > >>>> i would also lean towards the POJO option.
>> > > > > >>>
>> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
>> module. I
>> > > > think
>> > > > > we
>> > > > > >>>> can add a dependency to that module. However, it should also
>> be
>> > > > > possible
>> > > > > >>>> to
>> > > > > >>>> reuse the same connection of an InputFormat across
>> InputSplits,
>> > > > i.e.,
>> > > > > >>>> calls
>> > > > > >>>> of the open() method. Wouldn't that be sufficient?
>> > > > > >>>>
>> > > > > >>>> this is the right approach imo.
>> > > > > >>>
>> > > > > >>> Best, Fabian
>> > > > > >>>>
>> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
>> > > pomperma...@okkam.it
>> > > > >:
>> > > > > >>>>
>> > > > > >>>> Hi guys,
>> > > > > >>>>
>> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
>> there's a
>> > > > couple
>> > > > > >>>>> of
>> > > > > >>>>> thing that I'd like to discuss with the core developers.
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>      1. about the JDBC type mapping (addValue() method at
>> [1]:
>> > At
>> > > > the
>> > > > > >>>>> moment
>> > > > > >>>>>      if I find a null value for a  Double, the getDouble of
>> > jdbc
>> > > > > return
>> > > > > >>>>> 0.0.
>> > > > > >>>>> Is
>> > > > > >>>>>      it really the correct behaviour? Wouldn't be better to
>> > use a
>> > > > > POJO
>> > > > > >>>>> or
>> > > > > >>>>> the
>> > > > > >>>>>      Row of datatable that can handle void? Moreover, the
>> > mapping
>> > > > > >>>>> between
>> > > > > >>>>> SQL
>> > > > > >>>>>      type and Java types varies much from the single JDBC
>> > > > > >>>>> implementation.
>> > > > > >>>>>      Wouldn't be better to rely on the Java type coming from
>> > > using
>> > > > > >>>>>      resultSet.getObject() to get such a mapping rather than
>> > > using
>> > > > > the
>> > > > > >>>>>      ResultSetMetadata types?
>> > > > > >>>>>      2. I'd like to handle connections very efficiently
>> because
>> > > we
>> > > > > >>>>> have a
>> > > > > >>>>> use
>> > > > > >>>>>      case with billions of records and thus millions of
>> splits
>> > > and
>> > > > > >>>>> establish
>> > > > > >>>>> a
>> > > > > >>>>>      new connection each time could be expensive. Would it
>> be a
>> > > > > >>>>> problem to
>> > > > > >>>>> add
>> > > > > >>>>>      apache pool dependency to the jdbc batch connector in
>> > order
>> > > to
>> > > > > >>>>> reuase
>> > > > > >>>>> the
>> > > > > >>>>>      created connections?
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>> [1]
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >>>>>
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to