Hi Jark, Thanks involving, yes, it's hard to understand to add isBounded on the source. I recommend adding only to sink at present, because sink has upstream. Its upstream is either bounded or unbounded.
Hi all, Let me summarize with your suggestions. public interface TableSourceFactory<T> extends TableFactory { ...... /** * Creates and configures a {@link TableSource} based on the given {@link Context}. * * @param context context of this table source. * @return the configured table source. */ default TableSource<T> createTableSource(Context context) { ObjectIdentifier tableIdentifier = context.getTableIdentifier(); return createTableSource( new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()), context.getTable()); } /** * Context of table source creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getTableIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getTableConfig(); } } public interface TableSinkFactory<T> extends TableFactory { ...... /** * Creates and configures a {@link TableSink} based on the given {@link Context}. * * @param context context of this table sink. * @return the configured table sink. */ default TableSink<T> createTableSink(Context context) { ObjectIdentifier tableIdentifier = context.getTableIdentifier(); return createTableSink( new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()), context.getTable()); } /** * Context of table sink creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getTableIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getTableConfig(); /** * @return Input whether or not it is bounded. */ boolean isBounded(); } } If there is no objection, I will start a vote thread. (if necessary, I can also edit a FLIP). Best, Jingsong Lee On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Thanks Bowen and Timo for involving. > > Hi Bowen, > > > 1. is it better to have explicit APIs like "createBatchTableSource(...)" > I think it is better to keep one method, since in [1], we have reached one > in DataStream layer to maintain a single API in "env.source". I think it is > good to not split batch and stream, And our TableSource/TableSink are the > same class for both batch and streaming too. > > > 2. I'm not sure of the benefits to have a CatalogTableContext class. > As Timo said, We may have more parameters to add in the future, take a > look to "AbstractRichFunction.RuntimeContext", It's added little by little. > > Hi Timo, > > Your suggestion about Context looks good to me. > "TablePath" used in Hive for updating the catalog information of this > table. Yes, "ObjectIdentifier" looks better than "ObjectPath". > > > Can we postpone the change of TableValidators? > Yes, ConfigOption validation looks good to me. It seems that you have been > thinking about this for a long time. It's very good. Looking forward to the > promotion of FLIP-54. > > [1] > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 > > Best, > Jingsong Lee > > On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> wrote: > >> Hi Jingsong, >> >> +1 for adding a context in the source and sink factories. A context >> class also allows for future modifications without touching the >> TableFactory interface again. >> >> How about: >> >> interface TableSourceFactory { >> interface Context { >> // ... >> } >> } >> >> Because I find the name `CatalogTableContext` confusing and we can bound >> the interface to the factory class itself as an inner interface. >> >> Readable access to configuration sounds also right to me. Can we remove >> the `ObjectPath getTablePath()` method? I don't see a reason why a >> factory should know the path. And if so, it should be an >> `ObjectIdentifier` instead to also know about the catalog we are using. >> >> The `isStreamingMode()` should be renamed to `isBounded()` because we >> would like to use terminology around boundedness rather than >> streaming/batch. >> >> @Bowen: We are in the process of unifying the APIs and thus explicitly >> avoid specialized methods in the future. >> >> Can we postpone the change of TableValidators? I don't think that every >> factory needs a schema validator. Ideally, the factory should just >> return a List<ConfigOption> or ConfigOptionGroup that contains the >> validation logic as mentioned in the validation part of FLIP-54[1]. But >> currently our config options are not rich enough to have a unified >> validation. Additionally, the factory should return some properties such >> as "supports event-time" for the schema validation outside of the >> factory itself. >> >> Regards, >> Timo >> >> [1] >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration >> >> >> >> On 16.01.20 00:51, Bowen Li wrote: >> > Hi Jingsong, >> > >> > The 1st and 2nd pain points you described are very valid, as I'm more >> > familiar with them. I agree these are shortcomings of the current Flink >> SQL >> > design. >> > >> > A couple comments on your 1st proposal: >> > >> > 1. is it better to have explicit APIs like "createBatchTableSource(...)" >> > and "createStreamingTableSource(...)" in TableSourceFactory (would be >> > similar for sink factory) to let planner handle which mode (streaming vs >> > batch) of source should be instantiated? That way we don't need to >> always >> > let connector developers handling an if-else on isStreamingMode. >> > 2. I'm not sure of the benefits to have a CatalogTableContext class. The >> > path, table, and config are fairly independent of each other. So why not >> > pass the config in as 3rd parameter as `createXxxTableSource(path, >> > catalogTable, tableConfig)? >> > >> > >> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> > >> >> Hi dev, >> >> >> >> I'd like to kick off a discussion on the improvement of >> TableSourceFactory >> >> and TableSinkFactory. >> >> >> >> Motivation: >> >> Now the main needs and problems are: >> >> 1.Connector can't get TableConfig [1], and some behaviors really need >> to be >> >> controlled by the user's table configuration. In the era of catalog, we >> >> can't put these config in connector properties, which is too >> inconvenient. >> >> 2.Connector can't know if this is batch or stream execution mode. But >> the >> >> sink implementation of batch and stream is totally different. I >> understand >> >> there is an update mode property now, but it splits the batch and >> stream in >> >> the catalog dimension. In fact, this information can be obtained >> through >> >> the current TableEnvironment. >> >> 3.No interface to call validation. Now our validation is more util >> classes. >> >> It depends on whether or not the connector calls. Now we have some new >> >> validations to add, such as [2], which is really confuse uses, even >> >> developers. Another problem is that our SQL update (DDL) does not have >> >> validation [3]. It is better to report an error when executing DDL, >> >> otherwise it will confuse the user. >> >> >> >> Proposed change draft for 1 and 2: >> >> >> >> interface CatalogTableContext { >> >> ObjectPath getTablePath(); >> >> CatalogTable getTable(); >> >> ReadableConfig getTableConfig(); >> >> boolean isStreamingMode(); >> >> } >> >> >> >> public interface TableSourceFactory<T> extends TableFactory { >> >> >> >> default TableSource<T> createTableSource(CatalogTableContext >> context) { >> >> return createTableSource(context.getTablePath(), >> context.getTable()); >> >> } >> >> >> >> ...... >> >> } >> >> >> >> Proposed change draft for 3: >> >> >> >> public interface TableFactory { >> >> >> >> TableValidators validators(); >> >> >> >> interface TableValidators { >> >> ConnectorDescriptorValidator connectorValidator(); >> >> TableSchemaValidator schemaValidator(); >> >> FormatDescriptorValidator formatValidator(); >> >> } >> >> } >> >> >> >> What do you think? >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK-15290 >> >> [2] >> >> >> >> >> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 >> >> [3] https://issues.apache.org/jira/browse/FLINK-15509 >> >> >> >> Best, >> >> Jingsong Lee >> >> >> > >> >> > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee