Hi Fabian, I've just created a JIRA for that (FLINK-3777).
As you said input split should be not too fine-grained but we have a table
with 11 billions of rows that can't be queried with ranges greated than
100K of rows because it has a lot of JOIN and increasing thhis threashold
implies incredibly longer response time). This implies millions of splits
and, thus, millions of calls to open and thus connectiosn re-creation.. :(


On Mon, Apr 18, 2016 at 12:01 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> I agree, a method to close an input format is missing.
> InputFormat is an API stable interface, so it is not possible to extend it
> (until Flink 2.0). RichInputFormat is API stable as well, but an abstract
> class. So it should be possible to add an empty default implementation of a
> closeInputFormat() method there.
>
> Of course it would be good to re-use connections across input splits.
> On the other hand, input splits should not be too fine-grained as well,
> because input split assignment has some overhead as well.
>
> Best, Fabian
>
> 2016-04-18 9:49 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it>:
>
> > Yes, I forgot to mention that I could instantiate the connection in the
> > configure() but then I can't close it (as you confirmed) :(
> >
> > On Mon, Apr 18, 2016 at 9:46 AM, Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > There is also InputFormat.configure() which is called before any split
> > > processing happens. But I see your point about a missing close() method
> > > that is called after all input splits have been processed.
> > >
> > > On Mon, 18 Apr 2016 at 09:44 Stefano Bortoli <s.bort...@gmail.com>
> > wrote:
> > >
> > > > Of course there is one already. We'll look into the runtime context.
> > > >
> > > > saluti,
> > > > Stefano
> > > >
> > > > 2016-04-18 9:41 GMT+02:00 Stefano Bortoli <s.bort...@gmail.com>:
> > > >
> > > > > Being a generic JDBC input format, I would prefer to stay with Row,
> > > > > letting the developer manage the cast according to the driver
> > > > > functionalities.
> > > > >
> > > > > As for the open() and close() issue, I agree with Flavio that we'd
> > > need a
> > > > > better management of the inputformat lifecycle. Perhaps a new
> > interface
> > > > > extending it: RichInputFormat?
> > > > >
> > > > > my2c.
> > > > >
> > > > > Stefano
> > > > >
> > > > > 2016-04-18 9:35 GMT+02:00 Flavio Pompermaier <pomperma...@okkam.it
> >:
> > > > >
> > > > >> Talking with Stefano this morning and looking at the
> DataSourceTask
> > > code
> > > > >> we
> > > > >> discovered that the open() and close() methods are both called for
> > > every
> > > > >> split and not once per inputFormat instance (maybe open and close
> > > should
> > > > >> be
> > > > >> renamed as openSplit and closeSplit to avoid confusion...).
> > > > >> I think that it could worth to add 2 methods to the InputFormat
> > (e.g.
> > > > >> openInputFormat() and closeInputFormat() ) to allow for the
> > managment
> > > of
> > > > >> the InputFormat lifecycle, otherwise I'll need to instantiate a
> pool
> > > > (and
> > > > >> thus adding a dependency) to avoid the creation of a new
> connection
> > > > >> (expensive operation) for every split (that in our use case
> happens
> > > > >> millions of times).
> > > > >>
> > > > >> What about the output of the inputFormat? how do you want me to
> > > proceed?
> > > > >> With POJO or Row? If POJO, which strategy do you suggest?
> > > > >>
> > > > >> Best,
> > > > >> Flavio
> > > > >>
> > > > >> On Fri, Apr 15, 2016 at 2:06 PM, Stefano Bortoli <
> > s.bort...@gmail.com
> > > >
> > > > >> wrote:
> > > > >>
> > > > >> > If we share the connection, then we should also be careful with
> > the
> > > > >> close()
> > > > >> > implementation. I did not see changes for this method in the PR.
> > > > >> >
> > > > >> > saluti,
> > > > >> > Stefano
> > > > >> >
> > > > >> > 2016-04-15 11:01 GMT+02:00 Flavio Pompermaier <
> > pomperma...@okkam.it
> > > >:
> > > > >> >
> > > > >> > > Following your suggestions I've fixed the connection reuse in
> my
> > > PR
> > > > at
> > > > >> > > https://github.com/apache/flink/pull/1885.
> > > > >> > > I simply check in the establishConnection() if dbConn!=null
> and,
> > > in
> > > > >> that
> > > > >> > > case, I simply return immediately.
> > > > >> > >
> > > > >> > > Thus, the only remaining thin to fix is the null handling. Do
> > you
> > > > have
> > > > >> > any
> > > > >> > > suggestion about how to transform the results in a POJO?
> > > > >> > > Maybe returning a Row and then let the user manage the
> > conversion
> > > to
> > > > >> the
> > > > >> > > target POJO in a successive map could be a more general
> > soloution?
> > > > >> > >
> > > > >> > > Best,
> > > > >> > > Flavio
> > > > >> > >
> > > > >> > > On Thu, Apr 14, 2016 at 6:52 PM, Fabian Hueske <
> > fhue...@gmail.com
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > > There is an InputFormat object for each parallel task of a
> > > > >> DataSource.
> > > > >> > > > So for a source with parallelism 8 you will have 8 instances
> > of
> > > > the
> > > > >> > > > InputFormat running, regardless whether this is on one box
> > with
> > > 8
> > > > >> slots
> > > > >> > > or
> > > > >> > > > 8 machines with 1 slots each.
> > > > >> > > > The same is true for all other operators (Map, Reduce, Join,
> > > etc.)
> > > > >> and
> > > > >> > > > DataSinks.
> > > > >> > > >
> > > > >> > > > Note, a single task does not fill a slot, but a "slice" of
> the
> > > > >> program
> > > > >> > > (one
> > > > >> > > > parallel task of each operator) fills a slot.
> > > > >> > > >
> > > > >> > > > Cheers, Fabian
> > > > >> > > >
> > > > >> > > > 2016-04-14 18:47 GMT+02:00 Flavio Pompermaier <
> > > > pomperma...@okkam.it
> > > > >> >:
> > > > >> > > >
> > > > >> > > > > ok thanks!just one last question: an inputformat is
> > > instantiated
> > > > >> for
> > > > >> > > each
> > > > >> > > > > task slot or once for task manger?
> > > > >> > > > > On 14 Apr 2016 18:07, "Chesnay Schepler" <
> > ches...@apache.org>
> > > > >> wrote:
> > > > >> > > > >
> > > > >> > > > > > no.
> > > > >> > > > > >
> > > > >> > > > > > if (connection==null) {
> > > > >> > > > > >  establishCOnnection();
> > > > >> > > > > > }
> > > > >> > > > > >
> > > > >> > > > > > done. same connection for all splits.
> > > > >> > > > > >
> > > > >> > > > > > On 14.04.2016 17:59, Flavio Pompermaier wrote:
> > > > >> > > > > >
> > > > >> > > > > >> I didn't understand what you mean for "it should also
> be
> > > > >> possible
> > > > >> > to
> > > > >> > > > > reuse
> > > > >> > > > > >> the same connection of an InputFormat across
> InputSplits,
> > > > i.e.,
> > > > >> > > calls
> > > > >> > > > of
> > > > >> > > > > >> the open() method".
> > > > >> > > > > >> At the moment in the open method there's a call to
> > > > >> > > > establishConnection,
> > > > >> > > > > >> thus, a new connection is created for each split.
> > > > >> > > > > >> If I understood correctly, you're suggesting to create
> a
> > > pool
> > > > >> in
> > > > >> > the
> > > > >> > > > > >> inputFormat and simply call poo.borrow() in the open()
> > > rather
> > > > >> than
> > > > >> > > > > >> establishConnection?
> > > > >> > > > > >>
> > > > >> > > > > >> On 14 Apr 2016 17:28, "Chesnay Schepler" <
> > > ches...@apache.org
> > > > >
> > > > >> > > wrote:
> > > > >> > > > > >>
> > > > >> > > > > >> On 14.04.2016 17:22, Fabian Hueske wrote:
> > > > >> > > > > >>>
> > > > >> > > > > >>> Hi Flavio,
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> that are good questions.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> 1) Replacing null values by default values and simply
> > > > >> forwarding
> > > > >> > > > > records
> > > > >> > > > > >>>> is
> > > > >> > > > > >>>> very dangerous, in my opinion.
> > > > >> > > > > >>>> I see two alternatives: A) we use a data type that
> > > > tolerates
> > > > >> > null
> > > > >> > > > > >>>> values.
> > > > >> > > > > >>>> This could be a POJO that the user has to provide or
> > Row.
> > > > The
> > > > >> > > > drawback
> > > > >> > > > > >>>> of
> > > > >> > > > > >>>> Row is that it is untyped and not easy to handle. B)
> We
> > > use
> > > > >> > Tuple
> > > > >> > > > and
> > > > >> > > > > >>>> add
> > > > >> > > > > >>>> an additional field that holds an Integer which
> serves
> > > as a
> > > > >> > bitset
> > > > >> > > > to
> > > > >> > > > > >>>> mark
> > > > >> > > > > >>>> null fields. This would be a pretty low level API
> > > though. I
> > > > >> am
> > > > >> > > > leaning
> > > > >> > > > > >>>> towards the user-provided POJO option.
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> i would also lean towards the POJO option.
> > > > >> > > > > >>>
> > > > >> > > > > >>> 2) The JDBCInputFormat is located in a dedicated Maven
> > > > >> module. I
> > > > >> > > > think
> > > > >> > > > > we
> > > > >> > > > > >>>> can add a dependency to that module. However, it
> should
> > > > also
> > > > >> be
> > > > >> > > > > possible
> > > > >> > > > > >>>> to
> > > > >> > > > > >>>> reuse the same connection of an InputFormat across
> > > > >> InputSplits,
> > > > >> > > > i.e.,
> > > > >> > > > > >>>> calls
> > > > >> > > > > >>>> of the open() method. Wouldn't that be sufficient?
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> this is the right approach imo.
> > > > >> > > > > >>>
> > > > >> > > > > >>> Best, Fabian
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> 2016-04-14 16:59 GMT+02:00 Flavio Pompermaier <
> > > > >> > > pomperma...@okkam.it
> > > > >> > > > >:
> > > > >> > > > > >>>>
> > > > >> > > > > >>>> Hi guys,
> > > > >> > > > > >>>>
> > > > >> > > > > >>>>> I'm integrating the comments of Chesnay to my PR but
> > > > >> there's a
> > > > >> > > > couple
> > > > >> > > > > >>>>> of
> > > > >> > > > > >>>>> thing that I'd like to discuss with the core
> > developers.
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>      1. about the JDBC type mapping (addValue()
> method
> > > at
> > > > >> [1]:
> > > > >> > At
> > > > >> > > > the
> > > > >> > > > > >>>>> moment
> > > > >> > > > > >>>>>      if I find a null value for a  Double, the
> > getDouble
> > > > of
> > > > >> > jdbc
> > > > >> > > > > return
> > > > >> > > > > >>>>> 0.0.
> > > > >> > > > > >>>>> Is
> > > > >> > > > > >>>>>      it really the correct behaviour? Wouldn't be
> > better
> > > > to
> > > > >> > use a
> > > > >> > > > > POJO
> > > > >> > > > > >>>>> or
> > > > >> > > > > >>>>> the
> > > > >> > > > > >>>>>      Row of datatable that can handle void?
> Moreover,
> > > the
> > > > >> > mapping
> > > > >> > > > > >>>>> between
> > > > >> > > > > >>>>> SQL
> > > > >> > > > > >>>>>      type and Java types varies much from the single
> > > JDBC
> > > > >> > > > > >>>>> implementation.
> > > > >> > > > > >>>>>      Wouldn't be better to rely on the Java type
> > coming
> > > > from
> > > > >> > > using
> > > > >> > > > > >>>>>      resultSet.getObject() to get such a mapping
> > rather
> > > > than
> > > > >> > > using
> > > > >> > > > > the
> > > > >> > > > > >>>>>      ResultSetMetadata types?
> > > > >> > > > > >>>>>      2. I'd like to handle connections very
> > efficiently
> > > > >> because
> > > > >> > > we
> > > > >> > > > > >>>>> have a
> > > > >> > > > > >>>>> use
> > > > >> > > > > >>>>>      case with billions of records and thus millions
> > of
> > > > >> splits
> > > > >> > > and
> > > > >> > > > > >>>>> establish
> > > > >> > > > > >>>>> a
> > > > >> > > > > >>>>>      new connection each time could be expensive.
> > Would
> > > it
> > > > >> be a
> > > > >> > > > > >>>>> problem to
> > > > >> > > > > >>>>> add
> > > > >> > > > > >>>>>      apache pool dependency to the jdbc batch
> > connector
> > > in
> > > > >> > order
> > > > >> > > to
> > > > >> > > > > >>>>> reuase
> > > > >> > > > > >>>>> the
> > > > >> > > > > >>>>>      created connections?
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>> [1]
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://github.com/fpompermaier/flink/blob/FLINK-3750/flink-batch-connectors/flink-jdbc/src/main/java/org/apache/flink/api/java/io/jdbc/JDBCInputFormat.java
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >>>>>
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to