Hi Timo, G ood catch!
I really love the idea 2, a full Flink config looks very good to me. Try to understand your first one, actually we don't have `TableIdentifier` class now. But TableFactory already indicate table. So I am OK. New Context should be: /** * Context of table source creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getObjectIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getConfiguration(); } Best, Jingsong Lee On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> wrote: > Hi Jingsong, > > some last minute changes from my side: > > 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API > obvious. Otherwise people expect a `TableIdentifier` class being > returned here. > > 2. rename `getTableConfig` to `getConfiguration()` in the future this > will not only be a "table" config but might give access to the full > Flink config > > Thanks, > Timo > > > On 04.02.20 06:27, Jingsong Li wrote: > > So the interface will be: > > > > public interface TableSourceFactory<T> extends TableFactory { > > ...... > > > > /** > > * Creates and configures a {@link TableSource} based on the given > > {@link Context}. > > * > > * @param context context of this table source. > > * @return the configured table source. > > */ > > default TableSource<T> createTableSource(Context context) { > > ObjectIdentifier tableIdentifier = context.getTableIdentifier(); > > return createTableSource( > > new ObjectPath(tableIdentifier.getDatabaseName(), > > tableIdentifier.getObjectName()), > > context.getTable()); > > } > > /** > > * Context of table source creation. Contains table information and > > environment information. > > */ > > interface Context { > > /** > > * @return full identifier of the given {@link CatalogTable}. > > */ > > ObjectIdentifier getTableIdentifier(); > > /** > > * @return table {@link CatalogTable} instance. > > */ > > CatalogTable getTable(); > > /** > > * @return readable config of this table environment. > > */ > > ReadableConfig getTableConfig(); > > } > > } > > > > public interface TableSinkFactory<T> extends TableFactory { > > ...... > > /** > > * Creates and configures a {@link TableSink} based on the given > > {@link Context}. > > * > > * @param context context of this table sink. > > * @return the configured table sink. > > */ > > default TableSink<T> createTableSink(Context context) { > > ObjectIdentifier tableIdentifier = context.getTableIdentifier(); > > return createTableSink( > > new ObjectPath(tableIdentifier.getDatabaseName(), > > tableIdentifier.getObjectName()), > > context.getTable()); > > } > > /** > > * Context of table sink creation. Contains table information and > > environment information. > > */ > > interface Context { > > /** > > * @return full identifier of the given {@link CatalogTable}. > > */ > > ObjectIdentifier getTableIdentifier(); > > /** > > * @return table {@link CatalogTable} instance. > > */ > > CatalogTable getTable(); > > /** > > * @return readable config of this table environment. > > */ > > ReadableConfig getTableConfig(); > > } > > } > > > > > > Best, > > Jingsong Lee > > > > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > > > >> Hi all, > >> > >> After rethinking and discussion with Kurt, I'd like to remove > "isBounded". > >> We can delay this is bounded message to TableSink. > >> With TableSink refactor, we need consider "consumeDataStream" > >> and "consumeBoundedStream". > >> > >> Best, > >> Jingsong Lee > >> > >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> > wrote: > >> > >>> Hi Jark, > >>> > >>> Thanks involving, yes, it's hard to understand to add isBounded on the > >>> source. > >>> I recommend adding only to sink at present, because sink has upstream. > >>> Its upstream is either bounded or unbounded. > >>> > >>> Hi all, > >>> > >>> Let me summarize with your suggestions. > >>> > >>> public interface TableSourceFactory<T> extends TableFactory { > >>> > >>> ...... > >>> > >>> > >>> /** > >>> * Creates and configures a {@link TableSource} based on the given > {@link Context}. > >>> * > >>> * @param context context of this table source. > >>> * @return the configured table source. > >>> */ > >>> default TableSource<T> createTableSource(Context context) { > >>> ObjectIdentifier tableIdentifier = context.getTableIdentifier(); > >>> return createTableSource( > >>> new ObjectPath(tableIdentifier.getDatabaseName(), > tableIdentifier.getObjectName()), > >>> context.getTable()); > >>> } > >>> > >>> /** > >>> * Context of table source creation. Contains table information > and environment information. > >>> */ > >>> interface Context { > >>> > >>> /** > >>> * @return full identifier of the given {@link CatalogTable}. > >>> */ > >>> ObjectIdentifier getTableIdentifier(); > >>> > >>> /** > >>> * @return table {@link CatalogTable} instance. > >>> */ > >>> CatalogTable getTable(); > >>> > >>> /** > >>> * @return readable config of this table environment. > >>> */ > >>> ReadableConfig getTableConfig(); > >>> } > >>> } > >>> > >>> public interface TableSinkFactory<T> extends TableFactory { > >>> > >>> ...... > >>> > >>> /** > >>> * Creates and configures a {@link TableSink} based on the given > {@link Context}. > >>> * > >>> * @param context context of this table sink. > >>> * @return the configured table sink. > >>> */ > >>> default TableSink<T> createTableSink(Context context) { > >>> ObjectIdentifier tableIdentifier = context.getTableIdentifier(); > >>> return createTableSink( > >>> new ObjectPath(tableIdentifier.getDatabaseName(), > tableIdentifier.getObjectName()), > >>> context.getTable()); > >>> } > >>> > >>> /** > >>> * Context of table sink creation. Contains table information and > environment information. > >>> */ > >>> interface Context { > >>> > >>> /** > >>> * @return full identifier of the given {@link CatalogTable}. > >>> */ > >>> ObjectIdentifier getTableIdentifier(); > >>> > >>> /** > >>> * @return table {@link CatalogTable} instance. > >>> */ > >>> CatalogTable getTable(); > >>> > >>> /** > >>> * @return readable config of this table environment. > >>> */ > >>> ReadableConfig getTableConfig(); > >>> > >>> /** > >>> * @return Input whether or not it is bounded. > >>> */ > >>> boolean isBounded(); > >>> } > >>> } > >>> > >>> If there is no objection, I will start a vote thread. (if necessary, I > >>> can also edit a FLIP). > >>> > >>> Best, > >>> Jingsong Lee > >>> > >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com> > >>> wrote: > >>> > >>>> Thanks Bowen and Timo for involving. > >>>> > >>>> Hi Bowen, > >>>> > >>>>> 1. is it better to have explicit APIs like > >>>> "createBatchTableSource(...)" > >>>> I think it is better to keep one method, since in [1], we have reached > >>>> one in DataStream layer to maintain a single API in "env.source". I > think > >>>> it is good to not split batch and stream, And our > TableSource/TableSink are > >>>> the same class for both batch and streaming too. > >>>> > >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class. > >>>> As Timo said, We may have more parameters to add in the future, take a > >>>> look to "AbstractRichFunction.RuntimeContext", It's added little by > little. > >>>> > >>>> Hi Timo, > >>>> > >>>> Your suggestion about Context looks good to me. > >>>> "TablePath" used in Hive for updating the catalog information of this > >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath". > >>>> > >>>>> Can we postpone the change of TableValidators? > >>>> Yes, ConfigOption validation looks good to me. It seems that you have > >>>> been thinking about this for a long time. It's very good. Looking > forward > >>>> to the promotion of FLIP-54. > >>>> > >>>> [1] > >>>> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> > wrote: > >>>> > >>>>> Hi Jingsong, > >>>>> > >>>>> +1 for adding a context in the source and sink factories. A context > >>>>> class also allows for future modifications without touching the > >>>>> TableFactory interface again. > >>>>> > >>>>> How about: > >>>>> > >>>>> interface TableSourceFactory { > >>>>> interface Context { > >>>>> // ... > >>>>> } > >>>>> } > >>>>> > >>>>> Because I find the name `CatalogTableContext` confusing and we can > >>>>> bound > >>>>> the interface to the factory class itself as an inner interface. > >>>>> > >>>>> Readable access to configuration sounds also right to me. Can we > remove > >>>>> the `ObjectPath getTablePath()` method? I don't see a reason why a > >>>>> factory should know the path. And if so, it should be an > >>>>> `ObjectIdentifier` instead to also know about the catalog we are > using. > >>>>> > >>>>> The `isStreamingMode()` should be renamed to `isBounded()` because we > >>>>> would like to use terminology around boundedness rather than > >>>>> streaming/batch. > >>>>> > >>>>> @Bowen: We are in the process of unifying the APIs and thus > explicitly > >>>>> avoid specialized methods in the future. > >>>>> > >>>>> Can we postpone the change of TableValidators? I don't think that > every > >>>>> factory needs a schema validator. Ideally, the factory should just > >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains the > >>>>> validation logic as mentioned in the validation part of FLIP-54[1]. > But > >>>>> currently our config options are not rich enough to have a unified > >>>>> validation. Additionally, the factory should return some properties > >>>>> such > >>>>> as "supports event-time" for the schema validation outside of the > >>>>> factory itself. > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>>> [1] > >>>>> > >>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > >>>>> > >>>>> > >>>>> > >>>>> On 16.01.20 00:51, Bowen Li wrote: > >>>>>> Hi Jingsong, > >>>>>> > >>>>>> The 1st and 2nd pain points you described are very valid, as I'm > more > >>>>>> familiar with them. I agree these are shortcomings of the current > >>>>> Flink SQL > >>>>>> design. > >>>>>> > >>>>>> A couple comments on your 1st proposal: > >>>>>> > >>>>>> 1. is it better to have explicit APIs like > >>>>> "createBatchTableSource(...)" > >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory (would > be > >>>>>> similar for sink factory) to let planner handle which mode > (streaming > >>>>> vs > >>>>>> batch) of source should be instantiated? That way we don't need to > >>>>> always > >>>>>> let connector developers handling an if-else on isStreamingMode. > >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext class. > >>>>> The > >>>>>> path, table, and config are fairly independent of each other. So why > >>>>> not > >>>>>> pass the config in as 3rd parameter as `createXxxTableSource(path, > >>>>>> catalogTable, tableConfig)? > >>>>>> > >>>>>> > >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com > > > >>>>> wrote: > >>>>>> > >>>>>>> Hi dev, > >>>>>>> > >>>>>>> I'd like to kick off a discussion on the improvement of > >>>>> TableSourceFactory > >>>>>>> and TableSinkFactory. > >>>>>>> > >>>>>>> Motivation: > >>>>>>> Now the main needs and problems are: > >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors really > >>>>> need to be > >>>>>>> controlled by the user's table configuration. In the era of > catalog, > >>>>> we > >>>>>>> can't put these config in connector properties, which is too > >>>>> inconvenient. > >>>>>>> 2.Connector can't know if this is batch or stream execution mode. > >>>>> But the > >>>>>>> sink implementation of batch and stream is totally different. I > >>>>> understand > >>>>>>> there is an update mode property now, but it splits the batch and > >>>>> stream in > >>>>>>> the catalog dimension. In fact, this information can be obtained > >>>>> through > >>>>>>> the current TableEnvironment. > >>>>>>> 3.No interface to call validation. Now our validation is more util > >>>>> classes. > >>>>>>> It depends on whether or not the connector calls. Now we have some > >>>>> new > >>>>>>> validations to add, such as [2], which is really confuse uses, even > >>>>>>> developers. Another problem is that our SQL update (DDL) does not > >>>>> have > >>>>>>> validation [3]. It is better to report an error when executing DDL, > >>>>>>> otherwise it will confuse the user. > >>>>>>> > >>>>>>> Proposed change draft for 1 and 2: > >>>>>>> > >>>>>>> interface CatalogTableContext { > >>>>>>> ObjectPath getTablePath(); > >>>>>>> CatalogTable getTable(); > >>>>>>> ReadableConfig getTableConfig(); > >>>>>>> boolean isStreamingMode(); > >>>>>>> } > >>>>>>> > >>>>>>> public interface TableSourceFactory<T> extends TableFactory { > >>>>>>> > >>>>>>> default TableSource<T> createTableSource(CatalogTableContext > >>>>> context) { > >>>>>>> return createTableSource(context.getTablePath(), > >>>>> context.getTable()); > >>>>>>> } > >>>>>>> > >>>>>>> ...... > >>>>>>> } > >>>>>>> > >>>>>>> Proposed change draft for 3: > >>>>>>> > >>>>>>> public interface TableFactory { > >>>>>>> > >>>>>>> TableValidators validators(); > >>>>>>> > >>>>>>> interface TableValidators { > >>>>>>> ConnectorDescriptorValidator connectorValidator(); > >>>>>>> TableSchemaValidator schemaValidator(); > >>>>>>> FormatDescriptorValidator formatValidator(); > >>>>>>> } > >>>>>>> } > >>>>>>> > >>>>>>> What do you think? > >>>>>>> > >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290 > >>>>>>> [2] > >>>>>>> > >>>>>>> > >>>>> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 > >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509 > >>>>>>> > >>>>>>> Best, > >>>>>>> Jingsong Lee > >>>>>>> > >>>>>> > >>>>> > >>>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >>> > >>> > >>> -- > >>> Best, Jingsong Lee > >>> > >> > >> > >> -- > >> Best, Jingsong Lee > >> > > > > > > -- Best, Jingsong Lee