So the interface will be: public interface TableSourceFactory<T> extends TableFactory { ......
/** * Creates and configures a {@link TableSource} based on the given {@link Context}. * * @param context context of this table source. * @return the configured table source. */ default TableSource<T> createTableSource(Context context) { ObjectIdentifier tableIdentifier = context.getTableIdentifier(); return createTableSource( new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()), context.getTable()); } /** * Context of table source creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getTableIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getTableConfig(); } } public interface TableSinkFactory<T> extends TableFactory { ...... /** * Creates and configures a {@link TableSink} based on the given {@link Context}. * * @param context context of this table sink. * @return the configured table sink. */ default TableSink<T> createTableSink(Context context) { ObjectIdentifier tableIdentifier = context.getTableIdentifier(); return createTableSink( new ObjectPath(tableIdentifier.getDatabaseName(), tableIdentifier.getObjectName()), context.getTable()); } /** * Context of table sink creation. Contains table information and environment information. */ interface Context { /** * @return full identifier of the given {@link CatalogTable}. */ ObjectIdentifier getTableIdentifier(); /** * @return table {@link CatalogTable} instance. */ CatalogTable getTable(); /** * @return readable config of this table environment. */ ReadableConfig getTableConfig(); } } Best, Jingsong Lee On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> wrote: > Hi all, > > After rethinking and discussion with Kurt, I'd like to remove "isBounded". > We can delay this is bounded message to TableSink. > With TableSink refactor, we need consider "consumeDataStream" > and "consumeBoundedStream". > > Best, > Jingsong Lee > > On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> wrote: > >> Hi Jark, >> >> Thanks involving, yes, it's hard to understand to add isBounded on the >> source. >> I recommend adding only to sink at present, because sink has upstream. >> Its upstream is either bounded or unbounded. >> >> Hi all, >> >> Let me summarize with your suggestions. >> >> public interface TableSourceFactory<T> extends TableFactory { >> >> ...... >> >> >> /** >> * Creates and configures a {@link TableSource} based on the given {@link >> Context}. >> * >> * @param context context of this table source. >> * @return the configured table source. >> */ >> default TableSource<T> createTableSource(Context context) { >> ObjectIdentifier tableIdentifier = context.getTableIdentifier(); >> return createTableSource( >> new ObjectPath(tableIdentifier.getDatabaseName(), >> tableIdentifier.getObjectName()), >> context.getTable()); >> } >> >> /** >> * Context of table source creation. Contains table information and >> environment information. >> */ >> interface Context { >> >> /** >> * @return full identifier of the given {@link CatalogTable}. >> */ >> ObjectIdentifier getTableIdentifier(); >> >> /** >> * @return table {@link CatalogTable} instance. >> */ >> CatalogTable getTable(); >> >> /** >> * @return readable config of this table environment. >> */ >> ReadableConfig getTableConfig(); >> } >> } >> >> public interface TableSinkFactory<T> extends TableFactory { >> >> ...... >> >> /** >> * Creates and configures a {@link TableSink} based on the given {@link >> Context}. >> * >> * @param context context of this table sink. >> * @return the configured table sink. >> */ >> default TableSink<T> createTableSink(Context context) { >> ObjectIdentifier tableIdentifier = context.getTableIdentifier(); >> return createTableSink( >> new ObjectPath(tableIdentifier.getDatabaseName(), >> tableIdentifier.getObjectName()), >> context.getTable()); >> } >> >> /** >> * Context of table sink creation. Contains table information and >> environment information. >> */ >> interface Context { >> >> /** >> * @return full identifier of the given {@link CatalogTable}. >> */ >> ObjectIdentifier getTableIdentifier(); >> >> /** >> * @return table {@link CatalogTable} instance. >> */ >> CatalogTable getTable(); >> >> /** >> * @return readable config of this table environment. >> */ >> ReadableConfig getTableConfig(); >> >> /** >> * @return Input whether or not it is bounded. >> */ >> boolean isBounded(); >> } >> } >> >> If there is no objection, I will start a vote thread. (if necessary, I >> can also edit a FLIP). >> >> Best, >> Jingsong Lee >> >> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >> >>> Thanks Bowen and Timo for involving. >>> >>> Hi Bowen, >>> >>> > 1. is it better to have explicit APIs like >>> "createBatchTableSource(...)" >>> I think it is better to keep one method, since in [1], we have reached >>> one in DataStream layer to maintain a single API in "env.source". I think >>> it is good to not split batch and stream, And our TableSource/TableSink are >>> the same class for both batch and streaming too. >>> >>> > 2. I'm not sure of the benefits to have a CatalogTableContext class. >>> As Timo said, We may have more parameters to add in the future, take a >>> look to "AbstractRichFunction.RuntimeContext", It's added little by little. >>> >>> Hi Timo, >>> >>> Your suggestion about Context looks good to me. >>> "TablePath" used in Hive for updating the catalog information of this >>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath". >>> >>> > Can we postpone the change of TableValidators? >>> Yes, ConfigOption validation looks good to me. It seems that you have >>> been thinking about this for a long time. It's very good. Looking forward >>> to the promotion of FLIP-54. >>> >>> [1] >>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 >>> >>> Best, >>> Jingsong Lee >>> >>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> wrote: >>> >>>> Hi Jingsong, >>>> >>>> +1 for adding a context in the source and sink factories. A context >>>> class also allows for future modifications without touching the >>>> TableFactory interface again. >>>> >>>> How about: >>>> >>>> interface TableSourceFactory { >>>> interface Context { >>>> // ... >>>> } >>>> } >>>> >>>> Because I find the name `CatalogTableContext` confusing and we can >>>> bound >>>> the interface to the factory class itself as an inner interface. >>>> >>>> Readable access to configuration sounds also right to me. Can we remove >>>> the `ObjectPath getTablePath()` method? I don't see a reason why a >>>> factory should know the path. And if so, it should be an >>>> `ObjectIdentifier` instead to also know about the catalog we are using. >>>> >>>> The `isStreamingMode()` should be renamed to `isBounded()` because we >>>> would like to use terminology around boundedness rather than >>>> streaming/batch. >>>> >>>> @Bowen: We are in the process of unifying the APIs and thus explicitly >>>> avoid specialized methods in the future. >>>> >>>> Can we postpone the change of TableValidators? I don't think that every >>>> factory needs a schema validator. Ideally, the factory should just >>>> return a List<ConfigOption> or ConfigOptionGroup that contains the >>>> validation logic as mentioned in the validation part of FLIP-54[1]. But >>>> currently our config options are not rich enough to have a unified >>>> validation. Additionally, the factory should return some properties >>>> such >>>> as "supports event-time" for the schema validation outside of the >>>> factory itself. >>>> >>>> Regards, >>>> Timo >>>> >>>> [1] >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration >>>> >>>> >>>> >>>> On 16.01.20 00:51, Bowen Li wrote: >>>> > Hi Jingsong, >>>> > >>>> > The 1st and 2nd pain points you described are very valid, as I'm more >>>> > familiar with them. I agree these are shortcomings of the current >>>> Flink SQL >>>> > design. >>>> > >>>> > A couple comments on your 1st proposal: >>>> > >>>> > 1. is it better to have explicit APIs like >>>> "createBatchTableSource(...)" >>>> > and "createStreamingTableSource(...)" in TableSourceFactory (would be >>>> > similar for sink factory) to let planner handle which mode (streaming >>>> vs >>>> > batch) of source should be instantiated? That way we don't need to >>>> always >>>> > let connector developers handling an if-else on isStreamingMode. >>>> > 2. I'm not sure of the benefits to have a CatalogTableContext class. >>>> The >>>> > path, table, and config are fairly independent of each other. So why >>>> not >>>> > pass the config in as 3rd parameter as `createXxxTableSource(path, >>>> > catalogTable, tableConfig)? >>>> > >>>> > >>>> > On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com> >>>> wrote: >>>> > >>>> >> Hi dev, >>>> >> >>>> >> I'd like to kick off a discussion on the improvement of >>>> TableSourceFactory >>>> >> and TableSinkFactory. >>>> >> >>>> >> Motivation: >>>> >> Now the main needs and problems are: >>>> >> 1.Connector can't get TableConfig [1], and some behaviors really >>>> need to be >>>> >> controlled by the user's table configuration. In the era of catalog, >>>> we >>>> >> can't put these config in connector properties, which is too >>>> inconvenient. >>>> >> 2.Connector can't know if this is batch or stream execution mode. >>>> But the >>>> >> sink implementation of batch and stream is totally different. I >>>> understand >>>> >> there is an update mode property now, but it splits the batch and >>>> stream in >>>> >> the catalog dimension. In fact, this information can be obtained >>>> through >>>> >> the current TableEnvironment. >>>> >> 3.No interface to call validation. Now our validation is more util >>>> classes. >>>> >> It depends on whether or not the connector calls. Now we have some >>>> new >>>> >> validations to add, such as [2], which is really confuse uses, even >>>> >> developers. Another problem is that our SQL update (DDL) does not >>>> have >>>> >> validation [3]. It is better to report an error when executing DDL, >>>> >> otherwise it will confuse the user. >>>> >> >>>> >> Proposed change draft for 1 and 2: >>>> >> >>>> >> interface CatalogTableContext { >>>> >> ObjectPath getTablePath(); >>>> >> CatalogTable getTable(); >>>> >> ReadableConfig getTableConfig(); >>>> >> boolean isStreamingMode(); >>>> >> } >>>> >> >>>> >> public interface TableSourceFactory<T> extends TableFactory { >>>> >> >>>> >> default TableSource<T> createTableSource(CatalogTableContext >>>> context) { >>>> >> return createTableSource(context.getTablePath(), >>>> context.getTable()); >>>> >> } >>>> >> >>>> >> ...... >>>> >> } >>>> >> >>>> >> Proposed change draft for 3: >>>> >> >>>> >> public interface TableFactory { >>>> >> >>>> >> TableValidators validators(); >>>> >> >>>> >> interface TableValidators { >>>> >> ConnectorDescriptorValidator connectorValidator(); >>>> >> TableSchemaValidator schemaValidator(); >>>> >> FormatDescriptorValidator formatValidator(); >>>> >> } >>>> >> } >>>> >> >>>> >> What do you think? >>>> >> >>>> >> [1] https://issues.apache.org/jira/browse/FLINK-15290 >>>> >> [2] >>>> >> >>>> >> >>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 >>>> >> [3] https://issues.apache.org/jira/browse/FLINK-15509 >>>> >> >>>> >> Best, >>>> >> Jingsong Lee >>>> >> >>>> > >>>> >>>> >>> >>> -- >>> Best, Jingsong Lee >>> >> >> >> -- >> Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee