So the interface will be:

public interface TableSourceFactory<T> extends TableFactory {
   ......

   /**
    * Creates and configures a {@link TableSource} based on the given
{@link Context}.
    *
    * @param context context of this table source.
    * @return the configured table source.
    */
   default TableSource<T> createTableSource(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSource(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }
   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}

public interface TableSinkFactory<T> extends TableFactory {
   ......
   /**
    * Creates and configures a {@link TableSink} based on the given
{@link Context}.
    *
    * @param context context of this table sink.
    * @return the configured table sink.
    */
   default TableSink<T> createTableSink(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSink(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }
   /**
    * Context of table sink creation. Contains table information and
environment information.
    */
   interface Context {
      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();
      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();
      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}


Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Hi all,
>
> After rethinking and discussion with Kurt, I'd like to remove "isBounded".
> We can delay this is bounded message to TableSink.
> With TableSink refactor, we need consider "consumeDataStream"
> and "consumeBoundedStream".
>
> Best,
> Jingsong Lee
>
> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> wrote:
>
>> Hi Jark,
>>
>> Thanks involving, yes, it's hard to understand to add isBounded on the
>> source.
>> I recommend adding only to sink at present, because sink has upstream.
>> Its upstream is either bounded or unbounded.
>>
>> Hi all,
>>
>> Let me summarize with your suggestions.
>>
>> public interface TableSourceFactory<T> extends TableFactory {
>>
>>    ......
>>
>>
>>    /**
>>     * Creates and configures a {@link TableSource} based on the given {@link 
>> Context}.
>>     *
>>     * @param context context of this table source.
>>     * @return the configured table source.
>>     */
>>    default TableSource<T> createTableSource(Context context) {
>>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>       return createTableSource(
>>             new ObjectPath(tableIdentifier.getDatabaseName(), 
>> tableIdentifier.getObjectName()),
>>             context.getTable());
>>    }
>>
>>    /**
>>     * Context of table source creation. Contains table information and 
>> environment information.
>>     */
>>    interface Context {
>>
>>       /**
>>        * @return full identifier of the given {@link CatalogTable}.
>>        */
>>       ObjectIdentifier getTableIdentifier();
>>
>>       /**
>>        * @return table {@link CatalogTable} instance.
>>        */
>>       CatalogTable getTable();
>>
>>       /**
>>        * @return readable config of this table environment.
>>        */
>>       ReadableConfig getTableConfig();
>>    }
>> }
>>
>> public interface TableSinkFactory<T> extends TableFactory {
>>
>>    ......
>>
>>    /**
>>     * Creates and configures a {@link TableSink} based on the given {@link 
>> Context}.
>>     *
>>     * @param context context of this table sink.
>>     * @return the configured table sink.
>>     */
>>    default TableSink<T> createTableSink(Context context) {
>>       ObjectIdentifier tableIdentifier = context.getTableIdentifier();
>>       return createTableSink(
>>             new ObjectPath(tableIdentifier.getDatabaseName(), 
>> tableIdentifier.getObjectName()),
>>             context.getTable());
>>    }
>>
>>    /**
>>     * Context of table sink creation. Contains table information and 
>> environment information.
>>     */
>>    interface Context {
>>
>>       /**
>>        * @return full identifier of the given {@link CatalogTable}.
>>        */
>>       ObjectIdentifier getTableIdentifier();
>>
>>       /**
>>        * @return table {@link CatalogTable} instance.
>>        */
>>       CatalogTable getTable();
>>
>>       /**
>>        * @return readable config of this table environment.
>>        */
>>       ReadableConfig getTableConfig();
>>
>>       /**
>>        * @return Input whether or not it is bounded.
>>        */
>>       boolean isBounded();
>>    }
>> }
>>
>> If there is no objection, I will start a vote thread. (if necessary, I
>> can also edit a FLIP).
>>
>> Best,
>> Jingsong Lee
>>
>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>
>>> Thanks Bowen and Timo for involving.
>>>
>>> Hi Bowen,
>>>
>>> > 1. is it better to have explicit APIs like
>>> "createBatchTableSource(...)"
>>> I think it is better to keep one method, since in [1], we have reached
>>> one in DataStream layer to maintain a single API in "env.source". I think
>>> it is good to not split batch and stream, And our TableSource/TableSink are
>>> the same class for both batch and streaming too.
>>>
>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>> As Timo said, We may have more parameters to add in the future, take a
>>> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>>>
>>> Hi Timo,
>>>
>>> Your suggestion about Context looks good to me.
>>> "TablePath" used in Hive for updating the catalog information of this
>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>>>
>>> > Can we postpone the change of TableValidators?
>>> Yes, ConfigOption validation looks good to me. It seems that you have
>>> been thinking about this for a long time. It's very good. Looking forward
>>> to the promotion of FLIP-54.
>>>
>>> [1]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> wrote:
>>>
>>>> Hi Jingsong,
>>>>
>>>> +1 for adding a context in the source and sink factories. A context
>>>> class also allows for future modifications without touching the
>>>> TableFactory interface again.
>>>>
>>>> How about:
>>>>
>>>> interface TableSourceFactory {
>>>>      interface Context {
>>>>         // ...
>>>>      }
>>>> }
>>>>
>>>> Because I find the name `CatalogTableContext` confusing and we can
>>>> bound
>>>> the interface to the factory class itself as an inner interface.
>>>>
>>>> Readable access to configuration sounds also right to me. Can we remove
>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>>>> factory should know the path. And if so, it should be an
>>>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>>>
>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>>>> would like to use terminology around boundedness rather than
>>>> streaming/batch.
>>>>
>>>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>>>> avoid specialized methods in the future.
>>>>
>>>> Can we postpone the change of TableValidators? I don't think that every
>>>> factory needs a schema validator. Ideally, the factory should just
>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>>>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>>>> currently our config options are not rich enough to have a unified
>>>> validation. Additionally, the factory should return some properties
>>>> such
>>>> as "supports event-time" for the schema validation outside of the
>>>> factory itself.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1]
>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>>>
>>>>
>>>>
>>>> On 16.01.20 00:51, Bowen Li wrote:
>>>> > Hi Jingsong,
>>>> >
>>>> > The 1st and 2nd pain points you described are very valid, as I'm more
>>>> > familiar with them. I agree these are shortcomings of the current
>>>> Flink SQL
>>>> > design.
>>>> >
>>>> > A couple comments on your 1st proposal:
>>>> >
>>>> > 1. is it better to have explicit APIs like
>>>> "createBatchTableSource(...)"
>>>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
>>>> > similar for sink factory) to let planner handle which mode (streaming
>>>> vs
>>>> > batch) of source should be instantiated? That way we don't need to
>>>> always
>>>> > let connector developers handling an if-else on isStreamingMode.
>>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
>>>> The
>>>> > path, table, and config are fairly independent of each other. So why
>>>> not
>>>> > pass the config in as 3rd parameter as `createXxxTableSource(path,
>>>> > catalogTable, tableConfig)?
>>>> >
>>>> >
>>>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi dev,
>>>> >>
>>>> >> I'd like to kick off a discussion on the improvement of
>>>> TableSourceFactory
>>>> >> and TableSinkFactory.
>>>> >>
>>>> >> Motivation:
>>>> >> Now the main needs and problems are:
>>>> >> 1.Connector can't get TableConfig [1], and some behaviors really
>>>> need to be
>>>> >> controlled by the user's table configuration. In the era of catalog,
>>>> we
>>>> >> can't put these config in connector properties, which is too
>>>> inconvenient.
>>>> >> 2.Connector can't know if this is batch or stream execution mode.
>>>> But the
>>>> >> sink implementation of batch and stream is totally different. I
>>>> understand
>>>> >> there is an update mode property now, but it splits the batch and
>>>> stream in
>>>> >> the catalog dimension. In fact, this information can be obtained
>>>> through
>>>> >> the current TableEnvironment.
>>>> >> 3.No interface to call validation. Now our validation is more util
>>>> classes.
>>>> >> It depends on whether or not the connector calls. Now we have some
>>>> new
>>>> >> validations to add, such as [2], which is really confuse uses, even
>>>> >> developers. Another problem is that our SQL update (DDL) does not
>>>> have
>>>> >> validation [3]. It is better to report an error when executing DDL,
>>>> >> otherwise it will confuse the user.
>>>> >>
>>>> >> Proposed change draft for 1 and 2:
>>>> >>
>>>> >> interface CatalogTableContext {
>>>> >>     ObjectPath getTablePath();
>>>> >>     CatalogTable getTable();
>>>> >>     ReadableConfig getTableConfig();
>>>> >>     boolean isStreamingMode();
>>>> >> }
>>>> >>
>>>> >> public interface TableSourceFactory<T> extends TableFactory {
>>>> >>
>>>> >>     default TableSource<T> createTableSource(CatalogTableContext
>>>> context) {
>>>> >>        return createTableSource(context.getTablePath(),
>>>> context.getTable());
>>>> >>     }
>>>> >>
>>>> >>     ......
>>>> >> }
>>>> >>
>>>> >> Proposed change draft for 3:
>>>> >>
>>>> >> public interface TableFactory {
>>>> >>
>>>> >>     TableValidators validators();
>>>> >>
>>>> >>     interface TableValidators {
>>>> >>        ConnectorDescriptorValidator connectorValidator();
>>>> >>        TableSchemaValidator schemaValidator();
>>>> >>        FormatDescriptorValidator formatValidator();
>>>> >>     }
>>>> >> }
>>>> >>
>>>> >> What do you think?
>>>> >>
>>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
>>>> >> [2]
>>>> >>
>>>> >>
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>>>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
>>>> >>
>>>> >> Best,
>>>> >> Jingsong Lee
>>>> >>
>>>> >
>>>>
>>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Reply via email to