Hi Jingsong,

+1 for adding a context in the source and sink factories. A context class also allows for future modifications without touching the TableFactory interface again.

How about:

interface TableSourceFactory {
    interface Context {
       // ...
    }
}

Because I find the name `CatalogTableContext` confusing and we can bound the interface to the factory class itself as an inner interface.

Readable access to configuration sounds also right to me. Can we remove the `ObjectPath getTablePath()` method? I don't see a reason why a factory should know the path. And if so, it should be an `ObjectIdentifier` instead to also know about the catalog we are using.

The `isStreamingMode()` should be renamed to `isBounded()` because we would like to use terminology around boundedness rather than streaming/batch.

@Bowen: We are in the process of unifying the APIs and thus explicitly avoid specialized methods in the future.

Can we postpone the change of TableValidators? I don't think that every factory needs a schema validator. Ideally, the factory should just return a List<ConfigOption> or ConfigOptionGroup that contains the validation logic as mentioned in the validation part of FLIP-54[1]. But currently our config options are not rich enough to have a unified validation. Additionally, the factory should return some properties such as "supports event-time" for the schema validation outside of the factory itself.

Regards,
Timo

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration


On 16.01.20 00:51, Bowen Li wrote:
Hi Jingsong,

The 1st and 2nd pain points you described are very valid, as I'm more
familiar with them. I agree these are shortcomings of the current Flink SQL
design.

A couple comments on your 1st proposal:

1. is it better to have explicit APIs like "createBatchTableSource(...)"
and "createStreamingTableSource(...)" in TableSourceFactory (would be
similar for sink factory) to let planner handle which mode (streaming vs
batch) of source should be instantiated? That way we don't need to always
let connector developers handling an if-else on isStreamingMode.
2. I'm not sure of the benefits to have a CatalogTableContext class. The
path, table, and config are fairly independent of each other. So why not
pass the config in as 3rd parameter as `createXxxTableSource(path,
catalogTable, tableConfig)?


On Tue, Jan 14, 2020 at 7:03 PM Jingsong Li <jingsongl...@gmail.com> wrote:

Hi dev,

I'd like to kick off a discussion on the improvement of TableSourceFactory
and TableSinkFactory.

Motivation:
Now the main needs and problems are:
1.Connector can't get TableConfig [1], and some behaviors really need to be
controlled by the user's table configuration. In the era of catalog, we
can't put these config in connector properties, which is too inconvenient.
2.Connector can't know if this is batch or stream execution mode. But the
sink implementation of batch and stream is totally different. I understand
there is an update mode property now, but it splits the batch and stream in
the catalog dimension. In fact, this information can be obtained through
the current TableEnvironment.
3.No interface to call validation. Now our validation is more util classes.
It depends on whether or not the connector calls. Now we have some new
validations to add, such as [2], which is really confuse uses, even
developers. Another problem is that our SQL update (DDL) does not have
validation [3]. It is better to report an error when executing DDL,
otherwise it will confuse the user.

Proposed change draft for 1 and 2:

interface CatalogTableContext {
    ObjectPath getTablePath();
    CatalogTable getTable();
    ReadableConfig getTableConfig();
    boolean isStreamingMode();
}

public interface TableSourceFactory<T> extends TableFactory {

    default TableSource<T> createTableSource(CatalogTableContext context) {
       return createTableSource(context.getTablePath(), context.getTable());
    }

    ......
}

Proposed change draft for 3:

public interface TableFactory {

    TableValidators validators();

    interface TableValidators {
       ConnectorDescriptorValidator connectorValidator();
       TableSchemaValidator schemaValidator();
       FormatDescriptorValidator formatValidator();
    }
}

What do you think?

[1] https://issues.apache.org/jira/browse/FLINK-15290
[2]

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-A-mechanism-to-validate-the-precision-of-columns-for-connectors-td36552.html#a36556
[3] https://issues.apache.org/jira/browse/FLINK-15509

Best,
Jingsong Lee



Reply via email to