Hi all, I would like to add an additional method `getClassloader()` into the context. Because a TableFactory may require this classloader to find another TableFactory, e.g. we will find format factory in KafkaTableSourceSinkFactory. See FLINK-15992.
I don't think we need a new VOTE for this, I just want to make this discussion more publicly. What do you think? Best, Jark On Wed, 5 Feb 2020 at 16:05, Rui Li <lirui.fu...@gmail.com> wrote: > +1, thanks for the efforts. > > On Wed, Feb 5, 2020 at 4:00 PM Jingsong Li <jingsongl...@gmail.com> wrote: > > > Hi all, > > > > As Jark suggested in VOTE thread. > > JIRA created: https://issues.apache.org/jira/browse/FLINK-15912 > > > > Best, > > Jingsong Lee > > > > On Wed, Feb 5, 2020 at 10:57 AM Jingsong Li <jingsongl...@gmail.com> > > wrote: > > > > > Hi Timo, > > > > > > G ood catch! > > > > > > I really love the idea 2, a full Flink config looks very good to me. > > > > > > Try to understand your first one, actually we don't have > > `TableIdentifier` > > > class now. But TableFactory already indicate table. So I am OK. > > > > > > New Context should be: > > > > > > /** > > > * Context of table source creation. Contains table information and > > environment information. > > > */ > > > interface Context { > > > /** > > > * @return full identifier of the given {@link CatalogTable}. > > > */ > > > ObjectIdentifier getObjectIdentifier(); > > > /** > > > * @return table {@link CatalogTable} instance. > > > */ > > > CatalogTable getTable(); > > > /** > > > * @return readable config of this table environment. > > > */ > > > ReadableConfig getConfiguration(); > > > } > > > > > > > > > Best, > > > Jingsong Lee > > > > > > On Tue, Feb 4, 2020 at 8:51 PM Timo Walther <twal...@apache.org> > wrote: > > > > > >> Hi Jingsong, > > >> > > >> some last minute changes from my side: > > >> > > >> 1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the > API > > >> obvious. Otherwise people expect a `TableIdentifier` class being > > >> returned here. > > >> > > >> 2. rename `getTableConfig` to `getConfiguration()` in the future this > > >> will not only be a "table" config but might give access to the full > > >> Flink config > > >> > > >> Thanks, > > >> Timo > > >> > > >> > > >> On 04.02.20 06:27, Jingsong Li wrote: > > >> > So the interface will be: > > >> > > > >> > public interface TableSourceFactory<T> extends TableFactory { > > >> > ...... > > >> > > > >> > /** > > >> > * Creates and configures a {@link TableSource} based on the > given > > >> > {@link Context}. > > >> > * > > >> > * @param context context of this table source. > > >> > * @return the configured table source. > > >> > */ > > >> > default TableSource<T> createTableSource(Context context) { > > >> > ObjectIdentifier tableIdentifier = > > context.getTableIdentifier(); > > >> > return createTableSource( > > >> > new ObjectPath(tableIdentifier.getDatabaseName(), > > >> > tableIdentifier.getObjectName()), > > >> > context.getTable()); > > >> > } > > >> > /** > > >> > * Context of table source creation. Contains table information > > and > > >> > environment information. > > >> > */ > > >> > interface Context { > > >> > /** > > >> > * @return full identifier of the given {@link CatalogTable}. > > >> > */ > > >> > ObjectIdentifier getTableIdentifier(); > > >> > /** > > >> > * @return table {@link CatalogTable} instance. > > >> > */ > > >> > CatalogTable getTable(); > > >> > /** > > >> > * @return readable config of this table environment. > > >> > */ > > >> > ReadableConfig getTableConfig(); > > >> > } > > >> > } > > >> > > > >> > public interface TableSinkFactory<T> extends TableFactory { > > >> > ...... > > >> > /** > > >> > * Creates and configures a {@link TableSink} based on the given > > >> > {@link Context}. > > >> > * > > >> > * @param context context of this table sink. > > >> > * @return the configured table sink. > > >> > */ > > >> > default TableSink<T> createTableSink(Context context) { > > >> > ObjectIdentifier tableIdentifier = > > context.getTableIdentifier(); > > >> > return createTableSink( > > >> > new ObjectPath(tableIdentifier.getDatabaseName(), > > >> > tableIdentifier.getObjectName()), > > >> > context.getTable()); > > >> > } > > >> > /** > > >> > * Context of table sink creation. Contains table information > and > > >> > environment information. > > >> > */ > > >> > interface Context { > > >> > /** > > >> > * @return full identifier of the given {@link CatalogTable}. > > >> > */ > > >> > ObjectIdentifier getTableIdentifier(); > > >> > /** > > >> > * @return table {@link CatalogTable} instance. > > >> > */ > > >> > CatalogTable getTable(); > > >> > /** > > >> > * @return readable config of this table environment. > > >> > */ > > >> > ReadableConfig getTableConfig(); > > >> > } > > >> > } > > >> > > > >> > > > >> > Best, > > >> > Jingsong Lee > > >> > > > >> > On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> > > >> wrote: > > >> > > > >> >> Hi all, > > >> >> > > >> >> After rethinking and discussion with Kurt, I'd like to remove > > >> "isBounded". > > >> >> We can delay this is bounded message to TableSink. > > >> >> With TableSink refactor, we need consider "consumeDataStream" > > >> >> and "consumeBoundedStream". > > >> >> > > >> >> Best, > > >> >> Jingsong Lee > > >> >> > > >> >> On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com > > > > >> wrote: > > >> >> > > >> >>> Hi Jark, > > >> >>> > > >> >>> Thanks involving, yes, it's hard to understand to add isBounded on > > the > > >> >>> source. > > >> >>> I recommend adding only to sink at present, because sink has > > upstream. > > >> >>> Its upstream is either bounded or unbounded. > > >> >>> > > >> >>> Hi all, > > >> >>> > > >> >>> Let me summarize with your suggestions. > > >> >>> > > >> >>> public interface TableSourceFactory<T> extends TableFactory { > > >> >>> > > >> >>> ...... > > >> >>> > > >> >>> > > >> >>> /** > > >> >>> * Creates and configures a {@link TableSource} based on the > > >> given {@link Context}. > > >> >>> * > > >> >>> * @param context context of this table source. > > >> >>> * @return the configured table source. > > >> >>> */ > > >> >>> default TableSource<T> createTableSource(Context context) { > > >> >>> ObjectIdentifier tableIdentifier = > > >> context.getTableIdentifier(); > > >> >>> return createTableSource( > > >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), > > >> tableIdentifier.getObjectName()), > > >> >>> context.getTable()); > > >> >>> } > > >> >>> > > >> >>> /** > > >> >>> * Context of table source creation. Contains table > information > > >> and environment information. > > >> >>> */ > > >> >>> interface Context { > > >> >>> > > >> >>> /** > > >> >>> * @return full identifier of the given {@link > CatalogTable}. > > >> >>> */ > > >> >>> ObjectIdentifier getTableIdentifier(); > > >> >>> > > >> >>> /** > > >> >>> * @return table {@link CatalogTable} instance. > > >> >>> */ > > >> >>> CatalogTable getTable(); > > >> >>> > > >> >>> /** > > >> >>> * @return readable config of this table environment. > > >> >>> */ > > >> >>> ReadableConfig getTableConfig(); > > >> >>> } > > >> >>> } > > >> >>> > > >> >>> public interface TableSinkFactory<T> extends TableFactory { > > >> >>> > > >> >>> ...... > > >> >>> > > >> >>> /** > > >> >>> * Creates and configures a {@link TableSink} based on the > given > > >> {@link Context}. > > >> >>> * > > >> >>> * @param context context of this table sink. > > >> >>> * @return the configured table sink. > > >> >>> */ > > >> >>> default TableSink<T> createTableSink(Context context) { > > >> >>> ObjectIdentifier tableIdentifier = > > >> context.getTableIdentifier(); > > >> >>> return createTableSink( > > >> >>> new ObjectPath(tableIdentifier.getDatabaseName(), > > >> tableIdentifier.getObjectName()), > > >> >>> context.getTable()); > > >> >>> } > > >> >>> > > >> >>> /** > > >> >>> * Context of table sink creation. Contains table information > > and > > >> environment information. > > >> >>> */ > > >> >>> interface Context { > > >> >>> > > >> >>> /** > > >> >>> * @return full identifier of the given {@link > CatalogTable}. > > >> >>> */ > > >> >>> ObjectIdentifier getTableIdentifier(); > > >> >>> > > >> >>> /** > > >> >>> * @return table {@link CatalogTable} instance. > > >> >>> */ > > >> >>> CatalogTable getTable(); > > >> >>> > > >> >>> /** > > >> >>> * @return readable config of this table environment. > > >> >>> */ > > >> >>> ReadableConfig getTableConfig(); > > >> >>> > > >> >>> /** > > >> >>> * @return Input whether or not it is bounded. > > >> >>> */ > > >> >>> boolean isBounded(); > > >> >>> } > > >> >>> } > > >> >>> > > >> >>> If there is no objection, I will start a vote thread. (if > > necessary, I > > >> >>> can also edit a FLIP). > > >> >>> > > >> >>> Best, > > >> >>> Jingsong Lee > > >> >>> > > >> >>> On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li < > jingsongl...@gmail.com > > > > > >> >>> wrote: > > >> >>> > > >> >>>> Thanks Bowen and Timo for involving. > > >> >>>> > > >> >>>> Hi Bowen, > > >> >>>> > > >> >>>>> 1. is it better to have explicit APIs like > > >> >>>> "createBatchTableSource(...)" > > >> >>>> I think it is better to keep one method, since in [1], we have > > >> reached > > >> >>>> one in DataStream layer to maintain a single API in > "env.source". I > > >> think > > >> >>>> it is good to not split batch and stream, And our > > >> TableSource/TableSink are > > >> >>>> the same class for both batch and streaming too. > > >> >>>> > > >> >>>>> 2. I'm not sure of the benefits to have a CatalogTableContext > > class. > > >> >>>> As Timo said, We may have more parameters to add in the future, > > take > > >> a > > >> >>>> look to "AbstractRichFunction.RuntimeContext", It's added little > by > > >> little. > > >> >>>> > > >> >>>> Hi Timo, > > >> >>>> > > >> >>>> Your suggestion about Context looks good to me. > > >> >>>> "TablePath" used in Hive for updating the catalog information of > > this > > >> >>>> table. Yes, "ObjectIdentifier" looks better than "ObjectPath". > > >> >>>> > > >> >>>>> Can we postpone the change of TableValidators? > > >> >>>> Yes, ConfigOption validation looks good to me. It seems that you > > have > > >> >>>> been thinking about this for a long time. It's very good. Looking > > >> forward > > >> >>>> to the promotion of FLIP-54. > > >> >>>> > > >> >>>> [1] > > >> >>>> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-27-Refactor-Source-Interface-td24952i80.html#a36692 > > >> >>>> > > >> >>>> Best, > > >> >>>> Jingsong Lee > > >> >>>> > > >> >>>> On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org > > > > >> wrote: > > >> >>>> > > >> >>>>> Hi Jingsong, > > >> >>>>> > > >> >>>>> +1 for adding a context in the source and sink factories. A > > context > > >> >>>>> class also allows for future modifications without touching the > > >> >>>>> TableFactory interface again. > > >> >>>>> > > >> >>>>> How about: > > >> >>>>> > > >> >>>>> interface TableSourceFactory { > > >> >>>>> interface Context { > > >> >>>>> // ... > > >> >>>>> } > > >> >>>>> } > > >> >>>>> > > >> >>>>> Because I find the name `CatalogTableContext` confusing and we > can > > >> >>>>> bound > > >> >>>>> the interface to the factory class itself as an inner interface. > > >> >>>>> > > >> >>>>> Readable access to configuration sounds also right to me. Can we > > >> remove > > >> >>>>> the `ObjectPath getTablePath()` method? I don't see a reason > why a > > >> >>>>> factory should know the path. And if so, it should be an > > >> >>>>> `ObjectIdentifier` instead to also know about the catalog we are > > >> using. > > >> >>>>> > > >> >>>>> The `isStreamingMode()` should be renamed to `isBounded()` > because > > >> we > > >> >>>>> would like to use terminology around boundedness rather than > > >> >>>>> streaming/batch. > > >> >>>>> > > >> >>>>> @Bowen: We are in the process of unifying the APIs and thus > > >> explicitly > > >> >>>>> avoid specialized methods in the future. > > >> >>>>> > > >> >>>>> Can we postpone the change of TableValidators? I don't think > that > > >> every > > >> >>>>> factory needs a schema validator. Ideally, the factory should > just > > >> >>>>> return a List<ConfigOption> or ConfigOptionGroup that contains > the > > >> >>>>> validation logic as mentioned in the validation part of > > FLIP-54[1]. > > >> But > > >> >>>>> currently our config options are not rich enough to have a > unified > > >> >>>>> validation. Additionally, the factory should return some > > properties > > >> >>>>> such > > >> >>>>> as "supports event-time" for the schema validation outside of > the > > >> >>>>> factory itself. > > >> >>>>> > > >> >>>>> Regards, > > >> >>>>> Timo > > >> >>>>> > > >> >>>>> [1] > > >> >>>>> > > >> >>>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration > > >> >>>>> > > >> >>>>> > > >> >>>>> > > >> >>>>> On 16.01.20 00:51, Bowen Li wrote: > > >> >>>>>> Hi Jingsong, > > >> >>>>>> > > >> >>>>>> The 1st and 2nd pain points you described are very valid, as > I'm > > >> more > > >> >>>>>> familiar with them. I agree these are shortcomings of the > current > > >> >>>>> Flink SQL > > >> >>>>>> design. > > >> >>>>>> > > >> >>>>>> A couple comments on your 1st proposal: > > >> >>>>>> > > >> >>>>>> 1. is it better to have explicit APIs like > > >> >>>>> "createBatchTableSource(...)" > > >> >>>>>> and "createStreamingTableSource(...)" in TableSourceFactory > > (would > > >> be > > >> >>>>>> similar for sink factory) to let planner handle which mode > > >> (streaming > > >> >>>>> vs > > >> >>>>>> batch) of source should be instantiated? That way we don't need > > to > > >> >>>>> always > > >> >>>>>> let connector developers handling an if-else on > isStreamingMode. > > >> >>>>>> 2. I'm not sure of the benefits to have a CatalogTableContext > > >> class. > > >> >>>>> The > > >> >>>>>> path, table, and config are fairly independent of each other. > So > > >> why > > >> >>>>> not > > >> >>>>>> pass the config in as 3rd parameter as > > `createXxxTableSource(path, > > >> >>>>>> catalogTable, tableConfig)? > > >> >>>>>> > > >> >>>>>> > > >> >>>>>> On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li < > > >> jingsongl...@gmail.com> > > >> >>>>> wrote: > > >> >>>>>> > > >> >>>>>>> Hi dev, > > >> >>>>>>> > > >> >>>>>>> I'd like to kick off a discussion on the improvement of > > >> >>>>> TableSourceFactory > > >> >>>>>>> and TableSinkFactory. > > >> >>>>>>> > > >> >>>>>>> Motivation: > > >> >>>>>>> Now the main needs and problems are: > > >> >>>>>>> 1.Connector can't get TableConfig [1], and some behaviors > really > > >> >>>>> need to be > > >> >>>>>>> controlled by the user's table configuration. In the era of > > >> catalog, > > >> >>>>> we > > >> >>>>>>> can't put these config in connector properties, which is too > > >> >>>>> inconvenient. > > >> >>>>>>> 2.Connector can't know if this is batch or stream execution > > mode. > > >> >>>>> But the > > >> >>>>>>> sink implementation of batch and stream is totally different. > I > > >> >>>>> understand > > >> >>>>>>> there is an update mode property now, but it splits the batch > > and > > >> >>>>> stream in > > >> >>>>>>> the catalog dimension. In fact, this information can be > obtained > > >> >>>>> through > > >> >>>>>>> the current TableEnvironment. > > >> >>>>>>> 3.No interface to call validation. Now our validation is more > > util > > >> >>>>> classes. > > >> >>>>>>> It depends on whether or not the connector calls. Now we have > > some > > >> >>>>> new > > >> >>>>>>> validations to add, such as [2], which is really confuse uses, > > >> even > > >> >>>>>>> developers. Another problem is that our SQL update (DDL) does > > not > > >> >>>>> have > > >> >>>>>>> validation [3]. It is better to report an error when executing > > >> DDL, > > >> >>>>>>> otherwise it will confuse the user. > > >> >>>>>>> > > >> >>>>>>> Proposed change draft for 1 and 2: > > >> >>>>>>> > > >> >>>>>>> interface CatalogTableContext { > > >> >>>>>>> ObjectPath getTablePath(); > > >> >>>>>>> CatalogTable getTable(); > > >> >>>>>>> ReadableConfig getTableConfig(); > > >> >>>>>>> boolean isStreamingMode(); > > >> >>>>>>> } > > >> >>>>>>> > > >> >>>>>>> public interface TableSourceFactory<T> extends TableFactory { > > >> >>>>>>> > > >> >>>>>>> default TableSource<T> > > createTableSource(CatalogTableContext > > >> >>>>> context) { > > >> >>>>>>> return createTableSource(context.getTablePath(), > > >> >>>>> context.getTable()); > > >> >>>>>>> } > > >> >>>>>>> > > >> >>>>>>> ...... > > >> >>>>>>> } > > >> >>>>>>> > > >> >>>>>>> Proposed change draft for 3: > > >> >>>>>>> > > >> >>>>>>> public interface TableFactory { > > >> >>>>>>> > > >> >>>>>>> TableValidators validators(); > > >> >>>>>>> > > >> >>>>>>> interface TableValidators { > > >> >>>>>>> ConnectorDescriptorValidator connectorValidator(); > > >> >>>>>>> TableSchemaValidator schemaValidator(); > > >> >>>>>>> FormatDescriptorValidator formatValidator(); > > >> >>>>>>> } > > >> >>>>>>> } > > >> >>>>>>> > > >> >>>>>>> What do you think? > > >> >>>>>>> > > >> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-15290 > > >> >>>>>>> [2] > > >> >>>>>>> > > >> >>>>>>> > > >> >>>>> > > >> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556 > > >> >>>>>>> [3] https://issues.apache.org/jira/browse/FLINK-15509 > > >> >>>>>>> > > >> >>>>>>> Best, > > >> >>>>>>> Jingsong Lee > > >> >>>>>>> > > >> >>>>>> > > >> >>>>> > > >> >>>>> > > >> >>>> > > >> >>>> -- > > >> >>>> Best, Jingsong Lee > > >> >>>> > > >> >>> > > >> >>> > > >> >>> -- > > >> >>> Best, Jingsong Lee > > >> >>> > > >> >> > > >> >> > > >> >> -- > > >> >> Best, Jingsong Lee > > >> >> > > >> > > > >> > > > >> > > >> > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > Best regards! > Rui Li >