Hi Timo,

G ood catch!

I really love the idea 2, a full Flink config looks very good to me.

Try to understand your first one, actually we don't have `TableIdentifier`
class now. But TableFactory already indicate table. So I am OK.

New Context should be:

   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getObjectIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getConfiguration();
   }


Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote:

> Hi Jingsong,
>
> some last minute changes from my side:
>
> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
> obvious. Otherwise people expect a `TableIdentifier` class being
> returned here.
>
> 2. rename `getTableConfig` to `getConfiguration()` in the future this
> will not only be a "table" config but might give access to the full
> Flink config
>
> Thanks,
> Timo
>
>
> On 04.02.20 06:27, Jingsong Li wrote:
> > So the interface will be:
> >
> > public interface TableSourceFactory<T> extends TableFactory {
> >     ......
> >
> >     /**
> >      * Creates and configures a {@link TableSource} based on the given
> > {@link Context}.
> >      *
> >      * @param context context of this table source.
> >      * @return the configured table source.
> >      */
> >     default TableSource<T> createTableSource(Context context) {
> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >        return createTableSource(
> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > tableIdentifier.getObjectName()),
> >              context.getTable());
> >     }
> >     /**
> >      * Context of table source creation. Contains table information and
> > environment information.
> >      */
> >     interface Context {
> >        /**
> >         * @return full identifier of the given {@link CatalogTable}.
> >         */
> >        ObjectIdentifier getTableIdentifier();
> >        /**
> >         * @return table {@link CatalogTable} instance.
> >         */
> >        CatalogTable getTable();
> >        /**
> >         * @return readable config of this table environment.
> >         */
> >        ReadableConfig getTableConfig();
> >     }
> > }
> >
> > public interface TableSinkFactory<T> extends TableFactory {
> >     ......
> >     /**
> >      * Creates and configures a {@link TableSink} based on the given
> > {@link Context}.
> >      *
> >      * @param context context of this table sink.
> >      * @return the configured table sink.
> >      */
> >     default TableSink<T> createTableSink(Context context) {
> >        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >        return createTableSink(
> >              new ObjectPath(tableIdentifier.getDatabaseName(),
> > tableIdentifier.getObjectName()),
> >              context.getTable());
> >     }
> >     /**
> >      * Context of table sink creation. Contains table information and
> > environment information.
> >      */
> >     interface Context {
> >        /**
> >         * @return full identifier of the given {@link CatalogTable}.
> >         */
> >        ObjectIdentifier getTableIdentifier();
> >        /**
> >         * @return table {@link CatalogTable} instance.
> >         */
> >        CatalogTable getTable();
> >        /**
> >         * @return readable config of this table environment.
> >         */
> >        ReadableConfig getTableConfig();
> >     }
> > }
> >
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
> >
> >> Hi all,
> >>
> >> After rethinking and discussion with Kurt, I'd like to remove
> "isBounded".
> >> We can delay this is bounded message to TableSink.
> >> With TableSink refactor, we need consider "consumeDataStream"
> >> and "consumeBoundedStream".
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com>
> wrote:
> >>
> >>> Hi Jark,
> >>>
> >>> Thanks involving, yes, it's hard to understand to add isBounded on the
> >>> source.
> >>> I recommend adding only to sink at present, because sink has upstream.
> >>> Its upstream is either bounded or unbounded.
> >>>
> >>> Hi all,
> >>>
> >>> Let me summarize with your suggestions.
> >>>
> >>> public interface TableSourceFactory<T> extends TableFactory {
> >>>
> >>>     ......
> >>>
> >>>
> >>>     /**
> >>>      * Creates and configures a {@link TableSource} based on the given
> {@link Context}.
> >>>      *
> >>>      * @param context context of this table source.
> >>>      * @return the configured table source.
> >>>      */
> >>>     default TableSource<T> createTableSource(Context context) {
> >>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >>>        return createTableSource(
> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
> >>>              context.getTable());
> >>>     }
> >>>
> >>>     /**
> >>>      * Context of table source creation. Contains table information
> and environment information.
> >>>      */
> >>>     interface Context {
> >>>
> >>>        /**
> >>>         * @return full identifier of the given {@link CatalogTable}.
> >>>         */
> >>>        ObjectIdentifier getTableIdentifier();
> >>>
> >>>        /**
> >>>         * @return table {@link CatalogTable} instance.
> >>>         */
> >>>        CatalogTable getTable();
> >>>
> >>>        /**
> >>>         * @return readable config of this table environment.
> >>>         */
> >>>        ReadableConfig getTableConfig();
> >>>     }
> >>> }
> >>>
> >>> public interface TableSinkFactory<T> extends TableFactory {
> >>>
> >>>     ......
> >>>
> >>>     /**
> >>>      * Creates and configures a {@link TableSink} based on the given
> {@link Context}.
> >>>      *
> >>>      * @param context context of this table sink.
> >>>      * @return the configured table sink.
> >>>      */
> >>>     default TableSink<T> createTableSink(Context context) {
> >>>        ObjectIdentifier tableIdentifier = context.getTableIdentifier();
> >>>        return createTableSink(
> >>>              new ObjectPath(tableIdentifier.getDatabaseName(),
> tableIdentifier.getObjectName()),
> >>>              context.getTable());
> >>>     }
> >>>
> >>>     /**
> >>>      * Context of table sink creation. Contains table information and
> environment information.
> >>>      */
> >>>     interface Context {
> >>>
> >>>        /**
> >>>         * @return full identifier of the given {@link CatalogTable}.
> >>>         */
> >>>        ObjectIdentifier getTableIdentifier();
> >>>
> >>>        /**
> >>>         * @return table {@link CatalogTable} instance.
> >>>         */
> >>>        CatalogTable getTable();
> >>>
> >>>        /**
> >>>         * @return readable config of this table environment.
> >>>         */
> >>>        ReadableConfig getTableConfig();
> >>>
> >>>        /**
> >>>         * @return Input whether or not it is bounded.
> >>>         */
> >>>        boolean isBounded();
> >>>     }
> >>> }
> >>>
> >>> If there is no objection, I will start a vote thread. (if necessary, I
> >>> can also edit a FLIP).
> >>>
> >>> Best,
> >>> Jingsong Lee
> >>>
> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com>
> >>> wrote:
> >>>
> >>>> Thanks Bowen and Timo for involving.
> >>>>
> >>>> Hi Bowen,
> >>>>
> >>>>> 1. is it better to have explicit APIs like
> >>>> "createBatchTableSource(...)"
> >>>> I think it is better to keep one method, since in [1], we have reached
> >>>> one in DataStream layer to maintain a single API in "env.source". I
> think
> >>>> it is good to not split batch and stream, And our
> TableSource/TableSink are
> >>>> the same class for both batch and streaming too.
> >>>>
> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
> >>>> As Timo said, We may have more parameters to add in the future, take a
> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by
> little.
> >>>>
> >>>> Hi Timo,
> >>>>
> >>>> Your suggestion about Context looks good to me.
> >>>> "TablePath" used in Hive for updating the catalog information of this
> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
> >>>>
> >>>>> Can we postpone the change of TableValidators?
> >>>> Yes, ConfigOption validation looks good to me. It seems that you have
> >>>> been thinking about this for a long time. It's very good. Looking
> forward
> >>>> to the promotion of FLIP-54.
> >>>>
> >>>> [1]
> >>>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>>>
> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org>
> wrote:
> >>>>
> >>>>> Hi Jingsong,
> >>>>>
> >>>>> +1 for adding a context in the source and sink factories. A context
> >>>>> class also allows for future modifications without touching the
> >>>>> TableFactory interface again.
> >>>>>
> >>>>> How about:
> >>>>>
> >>>>> interface TableSourceFactory {
> >>>>>       interface Context {
> >>>>>          // ...
> >>>>>       }
> >>>>> }
> >>>>>
> >>>>> Because I find the name `CatalogTableContext` confusing and we can
> >>>>> bound
> >>>>> the interface to the factory class itself as an inner interface.
> >>>>>
> >>>>> Readable access to configuration sounds also right to me. Can we
> remove
> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
> >>>>> factory should know the path. And if so, it should be an
> >>>>> `ObjectIdentifier` instead to also know about the catalog we are
> using.
> >>>>>
> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
> >>>>> would like to use terminology around boundedness rather than
> >>>>> streaming/batch.
> >>>>>
> >>>>> @Bowen: We are in the process of unifying the APIs and thus
> explicitly
> >>>>> avoid specialized methods in the future.
> >>>>>
> >>>>> Can we postpone the change of TableValidators? I don't think that
> every
> >>>>> factory needs a schema validator. Ideally, the factory should just
> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
> >>>>> validation logic as mentioned in the validation part of FLIP-54[1].
> But
> >>>>> currently our config options are not rich enough to have a unified
> >>>>> validation. Additionally, the factory should return some properties
> >>>>> such
> >>>>> as "supports event-time" for the schema validation outside of the
> >>>>> factory itself.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 16.01.20 00:51, Bowen Li wrote:
> >>>>>> Hi Jingsong,
> >>>>>>
> >>>>>> The 1st and 2nd pain points you described are very valid, as I'm
> more
> >>>>>> familiar with them. I agree these are shortcomings of the current
> >>>>> Flink SQL
> >>>>>> design.
> >>>>>>
> >>>>>> A couple comments on your 1st proposal:
> >>>>>>
> >>>>>> 1. is it better to have explicit APIs like
> >>>>> "createBatchTableSource(...)"
> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would
> be
> >>>>>> similar for sink factory) to let planner handle which mode
> (streaming
> >>>>> vs
> >>>>>> batch) of source should be instantiated? That way we don't need to
> >>>>> always
> >>>>>> let connector developers handling an if-else on isStreamingMode.
> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class.
> >>>>> The
> >>>>>> path, table, and config are fairly independent of each other. So why
> >>>>> not
> >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path,
> >>>>>> catalogTable, tableConfig)?
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com
> >
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hi dev,
> >>>>>>>
> >>>>>>> I'd like to kick off a discussion on the improvement of
> >>>>> TableSourceFactory
> >>>>>>> and TableSinkFactory.
> >>>>>>>
> >>>>>>> Motivation:
> >>>>>>> Now the main needs and problems are:
> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really
> >>>>> need to be
> >>>>>>> controlled by the user's table configuration. In the era of
> catalog,
> >>>>> we
> >>>>>>> can't put these config in connector properties, which is too
> >>>>> inconvenient.
> >>>>>>> 2.Connector can't know if this is batch or stream execution mode.
> >>>>> But the
> >>>>>>> sink implementation of batch and stream is totally different. I
> >>>>> understand
> >>>>>>> there is an update mode property now, but it splits the batch and
> >>>>> stream in
> >>>>>>> the catalog dimension. In fact, this information can be obtained
> >>>>> through
> >>>>>>> the current TableEnvironment.
> >>>>>>> 3.No interface to call validation. Now our validation is more util
> >>>>> classes.
> >>>>>>> It depends on whether or not the connector calls. Now we have some
> >>>>> new
> >>>>>>> validations to add, such as [2], which is really confuse uses, even
> >>>>>>> developers. Another problem is that our SQL update (DDL) does not
> >>>>> have
> >>>>>>> validation [3]. It is better to report an error when executing DDL,
> >>>>>>> otherwise it will confuse the user.
> >>>>>>>
> >>>>>>> Proposed change draft for 1 and 2:
> >>>>>>>
> >>>>>>> interface CatalogTableContext {
> >>>>>>>      ObjectPath getTablePath();
> >>>>>>>      CatalogTable getTable();
> >>>>>>>      ReadableConfig getTableConfig();
> >>>>>>>      boolean isStreamingMode();
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface TableSourceFactory<T> extends TableFactory {
> >>>>>>>
> >>>>>>>      default TableSource<T> createTableSource(CatalogTableContext
> >>>>> context) {
> >>>>>>>         return createTableSource(context.getTablePath(),
> >>>>> context.getTable());
> >>>>>>>      }
> >>>>>>>
> >>>>>>>      ......
> >>>>>>> }
> >>>>>>>
> >>>>>>> Proposed change draft for 3:
> >>>>>>>
> >>>>>>> public interface TableFactory {
> >>>>>>>
> >>>>>>>      TableValidators validators();
> >>>>>>>
> >>>>>>>      interface TableValidators {
> >>>>>>>         ConnectorDescriptorValidator connectorValidator();
> >>>>>>>         TableSchemaValidator schemaValidator();
> >>>>>>>         FormatDescriptorValidator formatValidator();
> >>>>>>>      }
> >>>>>>> }
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290
> >>>>>>> [2]
> >>>>>>>
> >>>>>>>
> >>>>>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jingsong Lee
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>> --
> >>>> Best, Jingsong Lee
> >>>>
> >>>
> >>>
> >>> --
> >>> Best, Jingsong Lee
> >>>
> >>
> >>
> >> --
> >> Best, Jingsong Lee
> >>
> >
> >
>
>

-- 
Best, Jingsong Lee

Reply via email to