Hi all,

I would like to add an additional method `getClassloader()` into the
context.
Because a TableFactory may require this classloader to find another
TableFactory,
e.g. we will find format factory in KafkaTableSourceSinkFactory.
See FLINK-15992.

I don't think we need a new VOTE for this, I just want to make this
discussion more publicly.
What do you think?

Best,
Jark

On Wed, 5 Feb 2020 at 16:05, Rui Li <lirui.fu...@gmail.com> wrote:

> +1, thanks for the efforts.
>
> On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <jingsongl...@gmail.com> wrote:
>
> > Hi all,
> >
> > As Jark suggested in VOTE thread.
> > JIRA created: https://issues.apache.org/jira/browse/FLINK-15912
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com>
> > wrote:
> >
> > > Hi Timo,
> > >
> > > G ood catch!
> > >
> > > I really love the idea 2, a full Flink config looks very good to me.
> > >
> > > Try to understand your first one, actually we don't have
> > `TableIdentifier`
> > > class now. But TableFactory already indicate table. So I am OK.
> > >
> > > New Context should be:
> > >
> > >    /**
> > >     * Context of table source creation. Contains table information and
> > environment information.
> > >     */
> > >    interface Context {
> > >       /**
> > >        * @return full identifier of the given {@link CatalogTable}.
> > >        */
> > >       ObjectIdentifier getObjectIdentifier();
> > >       /**
> > >        * @return table {@link CatalogTable} instance.
> > >        */
> > >       CatalogTable getTable();
> > >       /**
> > >        * @return readable config of this table environment.
> > >        */
> > >       ReadableConfig getConfiguration();
> > >    }
> > >
> > >
> > > Best,
> > > Jingsong Lee
> > >
> > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org>
> wrote:
> > >
> > >> Hi Jingsong,
> > >>
> > >> some last minute changes from my side:
> > >>
> > >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the
> API
> > >> obvious. Otherwise people expect a `TableIdentifier` class being
> > >> returned here.
> > >>
> > >> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> > >> will not only be a "table" config but might give access to the full
> > >> Flink config
> > >>
> > >> Thanks,
> > >> Timo
> > >>
> > >>
> > >> On 04.02.20 06:27, Jingsong Li wrote:
> > >> > So the interface will be:
> > >> >
> > >> > public interface TableSourceFactory<T> extends TableFactory {
> > >> >     ......
> > >> >
> > >> >     /**
> > >> >      * Creates and configures a {@link TableSource} based on the
> given
> > >> > {@link Context}.
> > >> >      *
> > >> >      * @param context context of this table source.
> > >> >      * @return the configured table source.
> > >> >      */
> > >> >     default TableSource<T> createTableSource(Context context) {
> > >> >        ObjectIdentifier tableIdentifier =
> > context.getTableIdentifier();
> > >> >        return createTableSource(
> > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> > tableIdentifier.getObjectName()),
> > >> >              context.getTable());
> > >> >     }
> > >> >     /**
> > >> >      * Context of table source creation. Contains table information
> > and
> > >> > environment information.
> > >> >      */
> > >> >     interface Context {
> > >> >        /**
> > >> >         * @return full identifier of the given {@link CatalogTable}.
> > >> >         */
> > >> >        ObjectIdentifier getTableIdentifier();
> > >> >        /**
> > >> >         * @return table {@link CatalogTable} instance.
> > >> >         */
> > >> >        CatalogTable getTable();
> > >> >        /**
> > >> >         * @return readable config of this table environment.
> > >> >         */
> > >> >        ReadableConfig getTableConfig();
> > >> >     }
> > >> > }
> > >> >
> > >> > public interface TableSinkFactory<T> extends TableFactory {
> > >> >     ......
> > >> >     /**
> > >> >      * Creates and configures a {@link TableSink} based on the given
> > >> > {@link Context}.
> > >> >      *
> > >> >      * @param context context of this table sink.
> > >> >      * @return the configured table sink.
> > >> >      */
> > >> >     default TableSink<T> createTableSink(Context context) {
> > >> >        ObjectIdentifier tableIdentifier =
> > context.getTableIdentifier();
> > >> >        return createTableSink(
> > >> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> > tableIdentifier.getObjectName()),
> > >> >              context.getTable());
> > >> >     }
> > >> >     /**
> > >> >      * Context of table sink creation. Contains table information
> and
> > >> > environment information.
> > >> >      */
> > >> >     interface Context {
> > >> >        /**
> > >> >         * @return full identifier of the given {@link CatalogTable}.
> > >> >         */
> > >> >        ObjectIdentifier getTableIdentifier();
> > >> >        /**
> > >> >         * @return table {@link CatalogTable} instance.
> > >> >         */
> > >> >        CatalogTable getTable();
> > >> >        /**
> > >> >         * @return readable config of this table environment.
> > >> >         */
> > >> >        ReadableConfig getTableConfig();
> > >> >     }
> > >> > }
> > >> >
> > >> >
> > >> > Best,
> > >> > Jingsong Lee
> > >> >
> > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com>
> > >> wrote:
> > >> >
> > >> >> Hi all,
> > >> >>
> > >> >> After rethinking and discussion with Kurt, I'd like to remove
> > >> "isBounded".
> > >> >> We can delay this is bounded message to TableSink.
> > >> >> With TableSink refactor, we need consider "consumeDataStream"
> > >> >> and "consumeBoundedStream".
> > >> >>
> > >> >> Best,
> > >> >> Jingsong Lee
> > >> >>
> > >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com
> >
> > >> wrote:
> > >> >>
> > >> >>> Hi Jark,
> > >> >>>
> > >> >>> Thanks involving, yes, it's hard to understand to add isBounded on
> > the
> > >> >>> source.
> > >> >>> I recommend adding only to sink at present, because sink has
> > upstream.
> > >> >>> Its upstream is either bounded or unbounded.
> > >> >>>
> > >> >>> Hi all,
> > >> >>>
> > >> >>> Let me summarize with your suggestions.
> > >> >>>
> > >> >>> public interface TableSourceFactory<T> extends TableFactory {
> > >> >>>
> > >> >>>     ......
> > >> >>>
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Creates and configures a {@link TableSource} based on the
> > >> given {@link Context}.
> > >> >>>      *
> > >> >>>      * @param context context of this table source.
> > >> >>>      * @return the configured table source.
> > >> >>>      */
> > >> >>>     default TableSource<T> createTableSource(Context context) {
> > >> >>>        ObjectIdentifier tableIdentifier =
> > >> context.getTableIdentifier();
> > >> >>>        return createTableSource(
> > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> tableIdentifier.getObjectName()),
> > >> >>>              context.getTable());
> > >> >>>     }
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Context of table source creation. Contains table
> information
> > >> and environment information.
> > >> >>>      */
> > >> >>>     interface Context {
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return full identifier of the given {@link
> CatalogTable}.
> > >> >>>         */
> > >> >>>        ObjectIdentifier getTableIdentifier();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return table {@link CatalogTable} instance.
> > >> >>>         */
> > >> >>>        CatalogTable getTable();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return readable config of this table environment.
> > >> >>>         */
> > >> >>>        ReadableConfig getTableConfig();
> > >> >>>     }
> > >> >>> }
> > >> >>>
> > >> >>> public interface TableSinkFactory<T> extends TableFactory {
> > >> >>>
> > >> >>>     ......
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Creates and configures a {@link TableSink} based on the
> given
> > >> {@link Context}.
> > >> >>>      *
> > >> >>>      * @param context context of this table sink.
> > >> >>>      * @return the configured table sink.
> > >> >>>      */
> > >> >>>     default TableSink<T> createTableSink(Context context) {
> > >> >>>        ObjectIdentifier tableIdentifier =
> > >> context.getTableIdentifier();
> > >> >>>        return createTableSink(
> > >> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> > >> tableIdentifier.getObjectName()),
> > >> >>>              context.getTable());
> > >> >>>     }
> > >> >>>
> > >> >>>     /**
> > >> >>>      * Context of table sink creation. Contains table information
> > and
> > >> environment information.
> > >> >>>      */
> > >> >>>     interface Context {
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return full identifier of the given {@link
> CatalogTable}.
> > >> >>>         */
> > >> >>>        ObjectIdentifier getTableIdentifier();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return table {@link CatalogTable} instance.
> > >> >>>         */
> > >> >>>        CatalogTable getTable();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return readable config of this table environment.
> > >> >>>         */
> > >> >>>        ReadableConfig getTableConfig();
> > >> >>>
> > >> >>>        /**
> > >> >>>         * @return Input whether or not it is bounded.
> > >> >>>         */
> > >> >>>        boolean isBounded();
> > >> >>>     }
> > >> >>> }
> > >> >>>
> > >> >>> If there is no objection, I will start a vote thread. (if
> > necessary, I
> > >> >>> can also edit a FLIP).
> > >> >>>
> > >> >>> Best,
> > >> >>> Jingsong Lee
> > >> >>>
> > >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <
> jingsongl...@gmail.com
> > >
> > >> >>> wrote:
> > >> >>>
> > >> >>>> Thanks Bowen and Timo for involving.
> > >> >>>>
> > >> >>>> Hi Bowen,
> > >> >>>>
> > >> >>>>> 1. is it better to have explicit APIs like
> > >> >>>> "createBatchTableSource(...)"
> > >> >>>> I think it is better to keep one method, since in [1], we have
> > >> reached
> > >> >>>> one in DataStream layer to maintain a single API in
> "env.source". I
> > >> think
> > >> >>>> it is good to not split batch and stream, And our
> > >> TableSource/TableSink are
> > >> >>>> the same class for both batch and streaming too.
> > >> >>>>
> > >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > class.
> > >> >>>> As Timo said, We may have more parameters to add in the future,
> > take
> > >> a
> > >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little
> by
> > >> little.
> > >> >>>>
> > >> >>>> Hi Timo,
> > >> >>>>
> > >> >>>> Your suggestion about Context looks good to me.
> > >> >>>> "TablePath" used in Hive for updating the catalog information of
> > this
> > >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> > >> >>>>
> > >> >>>>> Can we postpone the change of TableValidators?
> > >> >>>> Yes, ConfigOption validation looks good to me. It seems that you
> > have
> > >> >>>> been thinking about this for a long time. It's very good. Looking
> > >> forward
> > >> >>>> to the promotion of FLIP-54.
> > >> >>>>
> > >> >>>> [1]
> > >> >>>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> > >> >>>>
> > >> >>>> Best,
> > >> >>>> Jingsong Lee
> > >> >>>>
> > >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org
> >
> > >> wrote:
> > >> >>>>
> > >> >>>>> Hi Jingsong,
> > >> >>>>>
> > >> >>>>> +1 for adding a context in the source and sink factories. A
> > context
> > >> >>>>> class also allows for future modifications without touching the
> > >> >>>>> TableFactory interface again.
> > >> >>>>>
> > >> >>>>> How about:
> > >> >>>>>
> > >> >>>>> interface TableSourceFactory {
> > >> >>>>>       interface Context {
> > >> >>>>>          // ...
> > >> >>>>>       }
> > >> >>>>> }
> > >> >>>>>
> > >> >>>>> Because I find the name `CatalogTableContext` confusing and we
> can
> > >> >>>>> bound
> > >> >>>>> the interface to the factory class itself as an inner interface.
> > >> >>>>>
> > >> >>>>> Readable access to configuration sounds also right to me. Can we
> > >> remove
> > >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason
> why a
> > >> >>>>> factory should know the path. And if so, it should be an
> > >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> > >> using.
> > >> >>>>>
> > >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()`
> because
> > >> we
> > >> >>>>> would like to use terminology around boundedness rather than
> > >> >>>>> streaming/batch.
> > >> >>>>>
> > >> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> > >> explicitly
> > >> >>>>> avoid specialized methods in the future.
> > >> >>>>>
> > >> >>>>> Can we postpone the change of TableValidators? I don't think
> that
> > >> every
> > >> >>>>> factory needs a schema validator. Ideally, the factory should
> just
> > >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains
> the
> > >> >>>>> validation logic as mentioned in the validation part of
> > FLIP-54[1].
> > >> But
> > >> >>>>> currently our config options are not rich enough to have a
> unified
> > >> >>>>> validation. Additionally, the factory should return some
> > properties
> > >> >>>>> such
> > >> >>>>> as "supports event-time" for the schema validation outside of
> the
> > >> >>>>> factory itself.
> > >> >>>>>
> > >> >>>>> Regards,
> > >> >>>>> Timo
> > >> >>>>>
> > >> >>>>> [1]
> > >> >>>>>
> > >> >>>>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> > >> >>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> > >> >>>>>> Hi Jingsong,
> > >> >>>>>>
> > >> >>>>>> The 1st and 2nd pain points you described are very valid, as
> I'm
> > >> more
> > >> >>>>>> familiar with them. I agree these are shortcomings of the
> current
> > >> >>>>> Flink SQL
> > >> >>>>>> design.
> > >> >>>>>>
> > >> >>>>>> A couple comments on your 1st proposal:
> > >> >>>>>>
> > >> >>>>>> 1. is it better to have explicit APIs like
> > >> >>>>> "createBatchTableSource(...)"
> > >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory
> > (would
> > >> be
> > >> >>>>>> similar for sink factory) to let planner handle which mode
> > >> (streaming
> > >> >>>>> vs
> > >> >>>>>> batch) of source should be instantiated? That way we don't need
> > to
> > >> >>>>> always
> > >> >>>>>> let connector developers handling an if-else on
> isStreamingMode.
> > >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext
> > >> class.
> > >> >>>>> The
> > >> >>>>>> path, table, and config are fairly independent of each other.
> So
> > >> why
> > >> >>>>> not
> > >> >>>>>> pass the config in as 3rd parameter as
> > `createXxxTableSource(path,
> > >> >>>>>> catalogTable, tableConfig)?
> > >> >>>>>>
> > >> >>>>>>
> > >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <
> > >> jingsongl...@gmail.com>
> > >> >>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> Hi dev,
> > >> >>>>>>>
> > >> >>>>>>> I'd like to kick off a discussion on the improvement of
> > >> >>>>> TableSourceFactory
> > >> >>>>>>> and TableSinkFactory.
> > >> >>>>>>>
> > >> >>>>>>> Motivation:
> > >> >>>>>>> Now the main needs and problems are:
> > >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors
> really
> > >> >>>>> need to be
> > >> >>>>>>> controlled by the user's table configuration. In the era of
> > >> catalog,
> > >> >>>>> we
> > >> >>>>>>> can't put these config in connector properties, which is too
> > >> >>>>> inconvenient.
> > >> >>>>>>> 2.Connector can't know if this is batch or stream execution
> > mode.
> > >> >>>>> But the
> > >> >>>>>>> sink implementation of batch and stream is totally different.
> I
> > >> >>>>> understand
> > >> >>>>>>> there is an update mode property now, but it splits the batch
> > and
> > >> >>>>> stream in
> > >> >>>>>>> the catalog dimension. In fact, this information can be
> obtained
> > >> >>>>> through
> > >> >>>>>>> the current TableEnvironment.
> > >> >>>>>>> 3.No interface to call validation. Now our validation is more
> > util
> > >> >>>>> classes.
> > >> >>>>>>> It depends on whether or not the connector calls. Now we have
> > some
> > >> >>>>> new
> > >> >>>>>>> validations to add, such as [2], which is really confuse uses,
> > >> even
> > >> >>>>>>> developers. Another problem is that our SQL update (DDL) does
> > not
> > >> >>>>> have
> > >> >>>>>>> validation [3]. It is better to report an error when executing
> > >> DDL,
> > >> >>>>>>> otherwise it will confuse the user.
> > >> >>>>>>>
> > >> >>>>>>> Proposed change draft for 1 and 2:
> > >> >>>>>>>
> > >> >>>>>>> interface CatalogTableContext {
> > >> >>>>>>>      ObjectPath getTablePath();
> > >> >>>>>>>      CatalogTable getTable();
> > >> >>>>>>>      ReadableConfig getTableConfig();
> > >> >>>>>>>      boolean isStreamingMode();
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> > >> >>>>>>>
> > >> >>>>>>>      default TableSource<T>
> > createTableSource(CatalogTableContext
> > >> >>>>> context) {
> > >> >>>>>>>         return createTableSource(context.getTablePath(),
> > >> >>>>> context.getTable());
> > >> >>>>>>>      }
> > >> >>>>>>>
> > >> >>>>>>>      ......
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> Proposed change draft for 3:
> > >> >>>>>>>
> > >> >>>>>>> public interface TableFactory {
> > >> >>>>>>>
> > >> >>>>>>>      TableValidators validators();
> > >> >>>>>>>
> > >> >>>>>>>      interface TableValidators {
> > >> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> > >> >>>>>>>         TableSchemaValidator schemaValidator();
> > >> >>>>>>>         FormatDescriptorValidator formatValidator();
> > >> >>>>>>>      }
> > >> >>>>>>> }
> > >> >>>>>>>
> > >> >>>>>>> What do you think?
> > >> >>>>>>>
> > >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> > >> >>>>>>> [2]
> > >> >>>>>>>
> > >> >>>>>>>
> > >> >>>>>
> > >>
> >
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> > >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> > >> >>>>>>>
> > >> >>>>>>> Best,
> > >> >>>>>>> Jingsong Lee
> > >> >>>>>>>
> > >> >>>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>> --
> > >> >>>> Best, Jingsong Lee
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >>> --
> > >> >>> Best, Jingsong Lee
> > >> >>>
> > >> >>
> > >> >>
> > >> >> --
> > >> >> Best, Jingsong Lee
> > >> >>
> > >> >
> > >> >
> > >>
> > >>
> > >
> > > --
> > > Best, Jingsong Lee
> > >
> >
> >
> > --
> > Best, Jingsong Lee
> >
>
>
> --
> Best regards!
> Rui Li
>

Reply via email to