Hi Jark,

Thanks involving, yes, it's hard to understand to add isBounded on the
source.
I recommend adding only to sink at present, because sink has upstream. Its
upstream is either bounded or unbounded.

Hi all,

Let me summarize with your suggestions.

public interface TableSourceFactory<T> extends TableFactory {

   ......


   /**
    * Creates and configures a {@link TableSource} based on the given
{@link Context}.
    *
    * @param context context of this table source.
    * @return the configured table source.
    */
   default TableSource<T> createTableSource(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSource(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }

   /**
    * Context of table source creation. Contains table information and
environment information.
    */
   interface Context {

      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();

      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();

      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();
   }
}

public interface TableSinkFactory<T> extends TableFactory {

   ......

   /**
    * Creates and configures a {@link TableSink} based on the given
{@link Context}.
    *
    * @param context context of this table sink.
    * @return the configured table sink.
    */
   default TableSink<T> createTableSink(Context context) {
      ObjectIdentifier tableIdentifier = context.getTableIdentifier();
      return createTableSink(
            new ObjectPath(tableIdentifier.getDatabaseName(),
tableIdentifier.getObjectName()),
            context.getTable());
   }

   /**
    * Context of table sink creation. Contains table information and
environment information.
    */
   interface Context {

      /**
       * @return full identifier of the given {@link CatalogTable}.
       */
      ObjectIdentifier getTableIdentifier();

      /**
       * @return table {@link CatalogTable} instance.
       */
      CatalogTable getTable();

      /**
       * @return readable config of this table environment.
       */
      ReadableConfig getTableConfig();

      /**
       * @return Input whether or not it is bounded.
       */
      boolean isBounded();
   }
}

If there is no objection, I will start a vote thread. (if necessary, I can
also edit a FLIP).

Best,
Jingsong Lee

On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com> wrote:

> Thanks Bowen and Timo for involving.
>
> Hi Bowen,
>
> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
> I think it is better to keep one method, since in [1], we have reached one
> in DataStream layer to maintain a single API in "env.source". I think it is
> good to not split batch and stream, And our TableSource/TableSink are the
> same class for both batch and streaming too.
>
> > 2. I'm not sure of the benefits to have a CatalogTableContext class.
> As Timo said, We may have more parameters to add in the future, take a
> look to "AbstractRichFunction.RuntimeContext", It's added little by little.
>
> Hi Timo,
>
> Your suggestion about Context looks good to me.
> "TablePath" used in Hive for updating the catalog information of this
> table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
>
> > Can we postpone the change of TableValidators?
> Yes, ConfigOption validation looks good to me. It seems that you have been
> thinking about this for a long time. It's very good. Looking forward to the
> promotion of FLIP-54.
>
> [1]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692
>
> Best,
> Jingsong Lee
>
> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> wrote:
>
>> Hi Jingsong,
>>
>> +1 for adding a context in the source and sink factories. A context
>> class also allows for future modifications without touching the
>> TableFactory interface again.
>>
>> How about:
>>
>> interface TableSourceFactory {
>>      interface Context {
>>         // ...
>>      }
>> }
>>
>> Because I find the name `CatalogTableContext` confusing and we can bound
>> the interface to the factory class itself as an inner interface.
>>
>> Readable access to configuration sounds also right to me. Can we remove
>> the `ObjectPath getTablePath()` method? I don't see a reason why a
>> factory should know the path. And if so, it should be an
>> `ObjectIdentifier` instead to also know about the catalog we are using.
>>
>> The `isStreamingMode()` should be renamed to `isBounded()` because we
>> would like to use terminology around boundedness rather than
>> streaming/batch.
>>
>> @Bowen: We are in the process of unifying the APIs and thus explicitly
>> avoid specialized methods in the future.
>>
>> Can we postpone the change of TableValidators? I don't think that every
>> factory needs a schema validator. Ideally, the factory should just
>> return a List<ConfigOption> or ConfigOptionGroup that contains the
>> validation logic as mentioned in the validation part of FLIP-54[1]. But
>> currently our config options are not rich enough to have a unified
>> validation. Additionally, the factory should return some properties such
>> as "supports event-time" for the schema validation outside of the
>> factory itself.
>>
>> Regards,
>> Timo
>>
>> [1]
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration
>>
>>
>>
>> On 16.01.20 00:51, Bowen Li wrote:
>> > Hi Jingsong,
>> >
>> > The 1st and 2nd pain points you described are very valid, as I'm more
>> > familiar with them. I agree these are shortcomings of the current Flink
>> SQL
>> > design.
>> >
>> > A couple comments on your 1st proposal:
>> >
>> > 1. is it better to have explicit APIs like "createBatchTableSource(...)"
>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be
>> > similar for sink factory) to let planner handle which mode (streaming vs
>> > batch) of source should be instantiated? That way we don't need to
>> always
>> > let connector developers handling an if-else on isStreamingMode.
>> > 2. I'm not sure of the benefits to have a CatalogTableContext class. The
>> > path, table, and config are fairly independent of each other. So why not
>> > pass the config in as 3rd parameter as `createXxxTableSource(path,
>> > catalogTable, tableConfig)?
>> >
>> >
>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>> >
>> >> Hi dev,
>> >>
>> >> I'd like to kick off a discussion on the improvement of
>> TableSourceFactory
>> >> and TableSinkFactory.
>> >>
>> >> Motivation:
>> >> Now the main needs and problems are:
>> >> 1.Connector can't get TableConfig [1], and some behaviors really need
>> to be
>> >> controlled by the user's table configuration. In the era of catalog, we
>> >> can't put these config in connector properties, which is too
>> inconvenient.
>> >> 2.Connector can't know if this is batch or stream execution mode. But
>> the
>> >> sink implementation of batch and stream is totally different. I
>> understand
>> >> there is an update mode property now, but it splits the batch and
>> stream in
>> >> the catalog dimension. In fact, this information can be obtained
>> through
>> >> the current TableEnvironment.
>> >> 3.No interface to call validation. Now our validation is more util
>> classes.
>> >> It depends on whether or not the connector calls. Now we have some new
>> >> validations to add, such as [2], which is really confuse uses, even
>> >> developers. Another problem is that our SQL update (DDL) does not have
>> >> validation [3]. It is better to report an error when executing DDL,
>> >> otherwise it will confuse the user.
>> >>
>> >> Proposed change draft for 1 and 2:
>> >>
>> >> interface CatalogTableContext {
>> >>     ObjectPath getTablePath();
>> >>     CatalogTable getTable();
>> >>     ReadableConfig getTableConfig();
>> >>     boolean isStreamingMode();
>> >> }
>> >>
>> >> public interface TableSourceFactory<T> extends TableFactory {
>> >>
>> >>     default TableSource<T> createTableSource(CatalogTableContext
>> context) {
>> >>        return createTableSource(context.getTablePath(),
>> context.getTable());
>> >>     }
>> >>
>> >>     ......
>> >> }
>> >>
>> >> Proposed change draft for 3:
>> >>
>> >> public interface TableFactory {
>> >>
>> >>     TableValidators validators();
>> >>
>> >>     interface TableValidators {
>> >>        ConnectorDescriptorValidator connectorValidator();
>> >>        TableSchemaValidator schemaValidator();
>> >>        FormatDescriptorValidator formatValidator();
>> >>     }
>> >> }
>> >>
>> >> What do you think?
>> >>
>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290
>> >> [2]
>> >>
>> >>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509
>> >>
>> >> Best,
>> >> Jingsong Lee
>> >>
>> >
>>
>>
>
> --
> Best, Jingsong Lee
>


-- 
Best, Jingsong Lee

Reply via email to