Hi Jingsong,
some last minute changes from my side:
1. rename `getTableIdentifier` to `getObjectIdentifier` to keep the API
obvious. Otherwise people expect a `TableIdentifier` class being
returned here.
2. rename `getTableConfig` to `getConfiguration()` in the future this
will not only be a "table" config but might give access to the full
Flink config
On 04.02.20 06:27, Jingsong Li wrote:
So the interface will be:
public interface TableSourceFactory<T> extends TableFactory {
* Creates and configures a {@link TableSource} based on the given
{@link Context}.
* @param context context of this table source.
* @return the configured table source.
default TableSource<T> createTableSource(Context context) {
ObjectIdentifier tableIdentifier = context.getTableIdentifier();
return createTableSource(
new ObjectPath(tableIdentifier.getDatabaseName(),
* Context of table source creation. Contains table information and
environment information.
interface Context {
* @return full identifier of the given {@link CatalogTable}.
ObjectIdentifier getTableIdentifier();
* @return table {@link CatalogTable} instance.
CatalogTable getTable();
* @return readable config of this table environment.
ReadableConfig getTableConfig();
public interface TableSinkFactory<T> extends TableFactory {
* Creates and configures a {@link TableSink} based on the given
{@link Context}.
* @param context context of this table sink.
* @return the configured table sink.
default TableSink<T> createTableSink(Context context) {
ObjectIdentifier tableIdentifier = context.getTableIdentifier();
return createTableSink(
new ObjectPath(tableIdentifier.getDatabaseName(),
* Context of table sink creation. Contains table information and
environment information.
interface Context {
* @return full identifier of the given {@link CatalogTable}.
ObjectIdentifier getTableIdentifier();
* @return table {@link CatalogTable} instance.
CatalogTable getTable();
* @return readable config of this table environment.
ReadableConfig getTableConfig();
Jingsong Lee
On Tue, Feb 4, 2020 at 1:22 PM Jingsong Li <jingsongl...@gmail.com> wrote:
Hi all,
After rethinking and discussion with Kurt, I'd like to remove "isBounded".
We can delay this is bounded message to TableSink.
With TableSink refactor, we need consider "consumeDataStream"
and "consumeBoundedStream".
Jingsong Lee
On Mon, Feb 3, 2020 at 4:17 PM Jingsong Li <jingsongl...@gmail.com> wrote:
Hi Jark,
Thanks involving, yes, it's hard to understand to add isBounded on the
I recommend adding only to sink at present, because sink has upstream.
Its upstream is either bounded or unbounded.
Hi all,
Let me summarize with your suggestions.
public interface TableSourceFactory<T> extends TableFactory {
* Creates and configures a {@link TableSource} based on the given {@link
* @param context context of this table source.
* @return the configured table source.
default TableSource<T> createTableSource(Context context) {
ObjectIdentifier tableIdentifier = context.getTableIdentifier();
return createTableSource(
new ObjectPath(tableIdentifier.getDatabaseName(),
* Context of table source creation. Contains table information and
environment information.
interface Context {
* @return full identifier of the given {@link CatalogTable}.
ObjectIdentifier getTableIdentifier();
* @return table {@link CatalogTable} instance.
CatalogTable getTable();
* @return readable config of this table environment.
ReadableConfig getTableConfig();
public interface TableSinkFactory<T> extends TableFactory {
* Creates and configures a {@link TableSink} based on the given {@link
* @param context context of this table sink.
* @return the configured table sink.
default TableSink<T> createTableSink(Context context) {
ObjectIdentifier tableIdentifier = context.getTableIdentifier();
return createTableSink(
new ObjectPath(tableIdentifier.getDatabaseName(),
* Context of table sink creation. Contains table information and
environment information.
interface Context {
* @return full identifier of the given {@link CatalogTable}.
ObjectIdentifier getTableIdentifier();
* @return table {@link CatalogTable} instance.
CatalogTable getTable();
* @return readable config of this table environment.
ReadableConfig getTableConfig();
* @return Input whether or not it is bounded.
boolean isBounded();
If there is no objection, I will start a vote thread. (if necessary, I
can also edit a FLIP).
Jingsong Lee
On Thu, Jan 16, 2020 at 7:56 PM Jingsong Li <jingsongl...@gmail.com>
Thanks Bowen and Timo for involving.
Hi Bowen,
1. is it better to have explicit APIs like
I think it is better to keep one method, since in [1], we have reached
one in DataStream layer to maintain a single API in "env.source". I think
it is good to not split batch and stream, And our TableSource/TableSink are
the same class for both batch and streaming too.
2. I'm not sure of the benefits to have a CatalogTableContext class.
As Timo said, We may have more parameters to add in the future, take a
look to "AbstractRichFunction.RuntimeContext", It's added little by little.
Hi Timo,
Your suggestion about Context looks good to me.
"TablePath" used in Hive for updating the catalog information of this
table. Yes, "ObjectIdentifier" looks better than "ObjectPath".
Can we postpone the change of TableValidators?
Yes, ConfigOption validation looks good to me. It seems that you have
been thinking about this for a long time. It's very good. Looking forward
to the promotion of FLIP-54.
Jingsong Lee
On Thu, Jan 16, 2020 at 6:01 PM Timo Walther <twal...@apache.org> wrote:
Hi Jingsong,
+1 for adding a context in the source and sink factories. A context
class also allows for future modifications without touching the
TableFactory interface again.
How about:
interface TableSourceFactory {
interface Context {
// ...
Because I find the name `CatalogTableContext` confusing and we can
the interface to the factory class itself as an inner interface.
Readable access to configuration sounds also right to me. Can we remove
the `ObjectPath getTablePath()` method? I don't see a reason why a
factory should know the path. And if so, it should be an
`ObjectIdentifier` instead to also know about the catalog we are using.
The `isStreamingMode()` should be renamed to `isBounded()` because we
would like to use terminology around boundedness rather than
@Bowen: We are in the process of unifying the APIs and thus explicitly
avoid specialized methods in the future.
Can we postpone the change of TableValidators? I don't think that every
factory needs a schema validator. Ideally, the factory should just
return a List<ConfigOption> or ConfigOptionGroup that contains the
validation logic as mentioned in the validation part of FLIP-54[1]. But
currently our config options are not rich enough to have a unified
validation. Additionally, the factory should return some properties
as "supports event-time" for the schema validation outside of the
factory itself.
On 16.01.20 00:51, Bowen Li wrote:
Hi Jingsong,
The 1st and 2nd pain points you described are very valid, as I'm more
familiar with them. I agree these are shortcomings of the current
Flink SQL
A couple comments on your 1st proposal:
1. is it better to have explicit APIs like
and "createStreamingTableSource(...)" in TableSourceFactory (would be
similar for sink factory) to let planner handle which mode (streaming
batch) of source should be instantiated? That way we don't need to
let connector developers handling an if-else on isStreamingMode.
2. I'm not sure of the benefits to have a CatalogTableContext class.
path, table, and config are fairly independent of each other. So why
pass the config in as 3rd parameter as `createXxxTableSource(path,
catalogTable, tableConfig)?
On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com>
Hi dev,
I'd like to kick off a discussion on the improvement of
and TableSinkFactory.
Now the main needs and problems are:
1.Connector can't get TableConfig [1], and some behaviors really
need to be
controlled by the user's table configuration. In the era of catalog,
can't put these config in connector properties, which is too
2.Connector can't know if this is batch or stream execution mode.
But the
sink implementation of batch and stream is totally different. I
there is an update mode property now, but it splits the batch and
stream in
the catalog dimension. In fact, this information can be obtained
the current TableEnvironment.
3.No interface to call validation. Now our validation is more util
It depends on whether or not the connector calls. Now we have some
validations to add, such as [2], which is really confuse uses, even
developers. Another problem is that our SQL update (DDL) does not
validation [3]. It is better to report an error when executing DDL,
otherwise it will confuse the user.
Proposed change draft for 1 and 2:
interface CatalogTableContext {
ObjectPath getTablePath();
CatalogTable getTable();
ReadableConfig getTableConfig();
boolean isStreamingMode();
public interface TableSourceFactory<T> extends TableFactory {
default TableSource<T> createTableSource(CatalogTableContext
context) {
return createTableSource(context.getTablePath(),
Proposed change draft for 3:
public interface TableFactory {
TableValidators validators();
interface TableValidators {
ConnectorDescriptorValidator connectorValidator();
TableSchemaValidator schemaValidator();
FormatDescriptorValidator formatValidator();
What do you think?
[1] https://issues.apache.org/jira/browse/FLINK-15290
[3] https://issues.apache.org/jira/browse/FLINK-15509
Jingsong Lee
Best, Jingsong Lee
Best, Jingsong Lee
Best, Jingsong Lee