Hi Timo and Dawid,

Thanks for the patient explanation. I just had a phone call with Kurt and
Jark. I do see there are a few abstractions that we only see the use case
in SQL so far. Therefore while thinking of a Source abstraction that may be
shared with different use cases semantics is theoretically useful, doing
that may not bring us much value at this point. So I am convinced that it
doesn't have to be done right now and I have no further concern with the
design in the current FLIP.

Again, really appreciate the patient discussion! I learned quite a bit from
it.

Cheers,

Jiangjie (Becket) Qin

On Thu, Mar 26, 2020 at 8:58 PM Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi Becket,
>
> Generally I don't think connector developers should bother with
> understanding any of the SQL concepts.
>
> I am not sure if we understand "connector developer" the same way. Let me
> describe how I see the process of writing a new source (that can be used in
> both Table & DataStream API)
>
> 1. Connector developer writes a Source that deals with the actual reading
> and deserializing (preferably with a pluggable format/deserializer). The
> result of that step should be something like:
>
> FilesystemSource
>
>     .path(...)
>
>     .format(ParquetFormat
>
>                     .filterPredicate(/* parquet specific filter */)
>
>                     .project(/* parquet specific projection */)
>
>                     .map(...))
>
>     .watermarkAssigner(...)
>
>
> This is useful for DataStream and we can and want to use this in the Table
> API.  Those interface shouldn't accept any *Translators though. It does
> make no sense cause internally they are not dealing e.g. with the
> Expression. They should accept already created predicates.
>
> We are not designing anything at that level. This we expect from FLIP-27
>
> 2. Then we need to have a DynamicTableSource with different abilities that
> can create e.g. the parquet filter or projection from expressions. I think
> this is what you also describe in your second point. And this is what we
> are designing in the FLIP. Bear in mind that e.g. Deserializer will be
> created out of multiple SQL concepts: regular schema/computed
> columns/possibly projections etc., each applied at different planning
> stages.
>
> All of those interfaces serve the purpose of configuring the
> DynamicTableSource so that it is able to instantiate the Source with proper
> configuration. In other words it is a factory for the source that you can
> configure with SQL concepts. In turn this Factory will call another factory
> from point 1.
>
> I don't see a potential for unifying factories across different high level
> APIs. Taking your example with Spatial Database that operates on
> Coordinates and Area (even though those would rather be modeled as SQL
> types and we would still operate on Rows, but just for the sake of the
> example). In that respect there is no point in having a
> PushDownComputedColumns interface in the factory for the spatial database.
>
> Best,
>
> Dawid
>
>
> On 26/03/2020 11:47, Becket Qin wrote:
>
> Hi Timo,
>
> Regarding "connector developers just need to know how to write an
>
> ExpressionToParquetFilter":
>
>
> This is the entire purpose of the DynamicTableSource/DynamicTableSink.
>
> The bridging between SQL concepts and connector specific concepts.
> Because this is the tricky part. How to get from a SQL concept to a
> connctor concept.
>
> Maybe it is just a naming issue depending on whether one is looking upward
> from the Connectors perspective, or looking downward from the SQL
> perspective. If we agree that the connectors should provide semantic free
> API to the high level use cases, it seems we should follow the former path.
> And if there are one or two APIs that the connector developers have to
> understand in order to support Table / SQL, I think we can just address
> them case by case, instead of wrapping the entire low level source API
> with a set of new concepts.
>
> Correct me if I am wrong, can we tell the following story to a connector
> developer and get a all the TableSource functionality work?
>
> To provide a TableSource from a Source, one just need to know two more
> concepts: *Row* and *Expression*. The work to create a TableSource are
> following:
> 1. A connector developer can write three classes in order to build a table
> source:
>
>    - Deserializer<Row> (Must-have)
>    - PredicateTranslator<Expression, FilterPredicate> (optional, only
>    applicable if the Source is a FilterableSource)
>    - PredicateTranslator<Expression, ProjectionPredicate> (optional, only
>    applicable if the Source is a ProjectableSource)
>
> 2. In order to let the table source be discoverable, one need to provide a
> Factory, and that Factory provides the following as a bundle:
>
>    - The Source itself (Must-have)
>    - The Deserializer<Row> (Must-have)
>    - PredicateTranslator<Expression, FilterPredicate> (optional, only
>    applicable when the Factory is a FilterFactory)
>    - PredicateTranslator<Expression, ProjectionPredicate> (optional, only
>    applicable when the Factory is a ProjectorFactory)
>
> 3. The Deserializer<Row> may implement one more decorative interfaces to
> further convert the record after deserialization.
>
>    - withMapFunction<Row, Row>;
>
> Note that the above description only require the connector developer to
> understand Expression and Row. If this works, It is much easier to explain
> than throwing a full set of new concepts. More importantly, it is way more
> generic. For example, If we change Row to Coordinates, and Expression to
> Area, we easily get a Source for a Spatial Database.
>
>
> One thing I want to call out is that while the old SourceFunction and
> InputFormat are concrete implementations that does the actual IO work. The
> Source API in FLIP-27 itself is kind of a Factory by itself already. So if
> we can push the decorative interfaces from the TableFactory layer to the
> Source layer, it will help unify the experience for DataStream and Table
> Source. This will also align with our goal of letting the DataStream Source
> provide a semantic free API that can be used by different high level API.
>
>
> BTW, Jark suggested that we can probably have an offline call to accelerate
> the discussion. I think it is a good idea. Can we do that?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Thu, Mar 26, 2020 at 5:28 PM Timo Walther <twal...@apache.org> 
> <twal...@apache.org> wrote:
>
>
> Hi Becket,
>
> Regarding "PushDown/NestedPushDown which is internal to optimizer":
>
> Those concepts cannot be entirely internal to the optimizer, at some
> point the optimizer needs to pass them into the connector specific code.
> This code will then convert it to e.g. Parque expressions. So there must
> be some interface that takes SQL Expression and converts to connector
> specific code. This interface between planner and connector is modelled
> by the SupportsXXX interfaces. And you are right, if developers don't
> care, they don't need to implement those optional interfaces but will
> not get performant connectors.
>
> Regarding "Table connector can work with the above two mechanism":
>
> A table connector needs three mechanisms that are represented in the
> current design.
>
> 1. a stateless discovery interface (Factory) that can convert
> ConfigOptions to a stateful factory interface
> (DynamicTableSource/DynamicTableSink)
>
> 2. a stateful factory interface (DynamicTableSource/DynamicTableSink)
> that receives concepts from the optimizer (watermarks, filters,
> projections) and produces runtime classes such as your
> `ExpressionToParquetFilter`
>
> 3. runtime interfaces that are generated from the stateful factory; all
> the factories that you mentioned can be used in `getScanRuntimeProvider`.
>
> Regarding "connector developers just need to know how to write an
> ExpressionToParquetFilter":
>
> This is the entire purpose of the DynamicTableSource/DynamicTableSink.
> The bridging between SQL concepts and connector specific concepts.
> Because this is the tricky part. How to get from a SQL concept to a
> connctor concept.
>
> Regards,
> Timo
>
>
> On 26.03.20 04:46, Becket Qin wrote:
>
> Hi Timo,
>
> Thanks for the reply. I totally agree that there must be something new
> added to the connector in order to make it work for SQL / Table. My
>
> concern
>
> is mostly over what they should be, and how to add them. To be honest, I
> was kind of lost when looking at the interfaces such as
> DataStructureConverter, RuntimeConverter and their internal context.
>
> Also I
>
> believe most connector developers do not care about the concept of
> "PushDown" / "NestedPushDown" which is internal to optimizer and not even
> exposed to SQL writers.
>
> Therefore I am trying to see if we can:
> A) Keep those additions minimum to the connector developers if they don't
> have to know the details.
> B) Expose as less high level concept as possible. More specifically, try
>
> to
>
> speak the connector language and expose the general mechanism instead of
> binding them with use case semantic.
>
> If we can achieve the above two goals, we could avoid adding unnecessary
> burden to the connector developers, and also make the connectors more
> generic.
>
> It might worth thinking about what additional work is necessary for the
> connector developers, here are what I am thinking of, please correct me
>
> if
>
> I miss something.
>
>     1. A Factory interface that allows high level use case, in this case
>     SQL, to find a matching source using service provider mechanism.
>     2. Allows the high level use case to specify the plugins that are
>     supported by the underneath DataStream Source.
>
> If Table connector can work with the above two mechanism, maybe we can
>
> make
>
> some slight modifications to the interfaces in the current FLIP.
>
>     - A *SourceFactory* which extends the Factory interface in the FLIP,
>     with one more method:
>        - *Source getSource();*
>     - Some decorative interfaces to the SourceFactory such as:
>        - *FilterFactory<PREDICATE, T extends Supplier<PREDICATE>>*, with
>
> the
>
>        following method
>           - T getFilter();
>        - *ProjectorFactory<PREDICATE, T extends Supplier<PREDICATE>>*,
>
> with
>
>        the following method.
>           - T getProjector();
>        - *DeserializerFactory<INPUT, OUTPUT>*
>
> With this set of API, a ParquetTableSourceFactory may become:
>
> class ParqeutTableSourceFactory implements
>             SourceFactory,
>             DeserializerFactory<ParquetRecords, Row>,
>             FilterFactory<ParquetFilter, ExressionToParquetFilter> {
>      @Override
>      ParquetSource getSource() { ... }
>
>      @Override
>      ExressionToParquetFilter getFilterSupplier() { ... };
> }
>
> The ExressionToParquetFilter will have an *applyPredicate(Expression)*
> method.
>
> I know it does not look like a perfect interface from the pure SQL
> perspective. And I am not even sure if this would meet all the
>
> requirements
>
> for SQL, but the benefit is that the connector developers just need to
>
> know
>
> how to write an ExpressionToParquetFilter in order to make it work for
> Table, without having to understand the entire SQL concept.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Mar 25, 2020 at 5:57 PM Timo Walther <twal...@apache.org> 
> <twal...@apache.org> wrote:
>
>
> Hi Becket,
>
> Let me clarify a few things first: Historically we thought of Table
> API/SQL as a library on top of DataStream API. Similar to Gelly or CEP.
> We used TypeInformation in Table API to integrate nicely with DataStream
> API. However, the last years have shown that SQL is not just a library.
> It is an entire ecosystem that defines data types, submission behavior,
> execution behavior, and highly optimized SerDes. SQL is a way to declare
> data processing end-to-end such that the planner has the full control
> over the execution.
>
> But I totally agree with your concerns around connectors. There is no
> big difference between your concerns and the current design.
>
> 1. "native connector interface is a generic abstraction of doing IO and
> Serde":
>
> This is the case in our design. We are using SourceFunction,
> DeserializationSchema, WatermarkAssigner, etc. all pluggable interfaces
> that the DataStream API offers for performing runtime operations.
>
> 2. "advanced features ... could be provided in a semantic free way":
>
> I agree here. But this is an orthogonal topic that each connector
> implementer should keep in mind. If a new connector is developed, it
> should *not* be developed only for SQL in mind but with good abstraction
> such that also DataStream API users can use it. A connector should have
> a builder pattern to plugin all capabilities like Parque filters etc.
> There should be no table-specific native/runtime connectors. I think
> this discussion is related to the discussion of FLIP-115.
>
> However, as I mentioned before: This FLIP only discusses the interfaces
> for communication between planner and connector factory. As Dawid said
> earlier, a DynamicTableSource can be more seen as a factory that calls
> pluggable interfaces of a native connextor in the end:
>
> KafkaConnector.builder()
>     .watermarkAssigner(...)
>     .keyDeser(...)
>     .valueDeser(...)
>     ....
>     .build()
>
> Regards,
> Timo
>
>
> On 25.03.20 09:05, Becket Qin wrote:
>
> Hi Kurt,
>
> I do not object to promote the concepts of SQL, but I don't think we
>
> should
>
> do that by introducing a new dedicate set of connector public
>
> interfaces
>
> that is only for SQL. The same argument can be applied to Gelly, CEP,
>
> and
>
> Machine Learning, claiming that they need to introduce a dedicated
>
> public
>
> set of interfaces that fits their own concept and ask the the connector
> developers to learn and follow their design. As an analogy, if we want
>
> to
>
> promote Chinese, we don't want to force people to learn ancient Chinese
> poem while they only need to know a few words like "hello" and
>
> "goodbye".
>
> As some design principles, here are what I think what  Flink connectors
> should look like:
>
> 1. The native connector interface is a generic abstraction of doing IO
>
> and
>
> Serde, without semantic for high level use cases such as SQL, Gelly,
>
> CEP,
>
> etc.
>
> 2. Some advanced features that may help accelerate the IO and Serde
>
> could
>
> be provided in the native connector interfaces in a semantic free way
>
> so
>
> all the high level use cases can leverage.
>
> 3. Additional semantics can be built on top of the native source
>
> interface
>
> through providing different plugins. These plugins could be high level
>
> use
>
> case aware. For example, to provide a filter to the source, we can do
>
> the
>
> following
>
> // An interface for all the filters that take an expression.
> interface ExpressionFilter {
>       FilterResult applyFilterExpression();
> }
>
> // An filter plugin implementation that translate the SQL Expression
>
> to a
>
> ParquetFilterPredicate.
> Class ParquetExpressionFilter implements
>
> Supplier<ParquetFilterPredicate>,
>
> ExpressionFilter {
>       // Called by the high level use case,
>       FilterResult applyFilterExpression() { ... }
>
>       // Used by the native Source interface.
>       ParquetFilterPredicate get() { ... }
> }
>
> In this case, the connector developer just need to write the logic of
> translating an Expression to Parquet FilterPredicate. They don't have
>
> to
>
> understand the entire set of interfaces that we want to promote. Just
>
> like
>
> they only need to know how to say "Hello" without learning ancient
>
> Chinese
>
> poem.
>
> Again, I am not saying this is necessarily the best approach. But so
>
> far
>
> it
>
> seems a reasonable design principle to tell the developers.
>
> Thanks,
>
> Jiangjie (becket) Qin
>
>
>
> On Wed, Mar 25, 2020 at 11:53 AM Kurt Young <ykt...@gmail.com> 
> <ykt...@gmail.com> wrote:
>
>
> Hi Becket,
>
> I don't think we should discuss this in pure engineering aspects. Your
> proposal is trying
> to let SQL connector developers understand as less SQL concepts as
> possible. But quite
> the opposite, we are designing those interfaces to emphasize the SQL
> concept, to bridge
> high level concepts into real interfaces and classes.
>
> We keep talking about time-varying relations and dynamic table when
> introduce SQL concepts,
> sources and sinks are most critical part playing with those concepts.
>
> It's
>
> essential to let
> Flink SQL developers to learn these concepts and connect them with
>
> real
>
> codes by introducing
> these connector interfaces and can further write *correct* connectors
>
> based
>
> on such domain
> knowledge.
>
> So this FLIP is a very important chance to express these concepts and
>
> make
>
> most SQL developers
> be align with concepts and on same page. It's mostly for different
>
> level of
>
> abstractions and for domains
> like SQL, it's becoming more important.  It helps Flink SQL go
>
> smoothly
>
> in
>
> the future, and also
> make it easier for new contributors. But I would admit this is not
>
> that
>
> obvious for others who don't work
> with SQL frequently.
>
> Best,
> Kurt
>
>
> On Wed, Mar 25, 2020 at 11:07 AM Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com>
>
> wrote:
>
> Hi Jark,
>
> It is good to know that we do not expect the end users to touch those
> interfaces.
>
> Then the question boils down to whether the connector developers
>
> should
>
> be
>
> aware of the interfaces that are only used by the SQL optimizer. It
>
> seems a
>
> win if we can avoid that.
>
> Two potential solutions off the top of my head are:
> 1. An internal helper class doing the instanceOf based on DataStream
>
> source
>
> interface and create pluggables for that DataStream source.
> 2. codegen the set of TableSource interfaces given a DataStream
>
> Source
>
> and
>
> its corresponding TablePluggablesFactory.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Mar 25, 2020 at 10:07 AM Jark Wu <imj...@gmail.com> 
> <imj...@gmail.com> wrote:
>
>
> Hi Becket,
>
> Regarding to Flavor1 and Flavor2, I want to clarify that user will
>
> never
>
> use table source like this:
>
> {
>      MyTableSource myTableSource = MyTableSourceFactory.create();
>      myTableSource.setSchema(mySchema);
>      myTableSource.applyFilterPredicate(expression);
>      ...
> }
>
> TableFactory and TableSource are not directly exposed to end users,
>
> all
>
> the
>
> methods are called by planner, not users.
> Users always use DDL or descriptor to register a table, and planner
>
> will
>
> find the factory and create sources according to the properties.
> All the optimization are applied automatically, e.g.
>
> filter/projection
>
> pushdown, users don't need to call `applyFilterPredicate`
>
> explicitly.
>
> On Wed, 25 Mar 2020 at 09:25, Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com>
>
> wrote:
>
> Hi Timo and Dawid,
>
> Thanks for the clarification. They really help. You are right that
>
> we
>
> are
>
> on the same page regarding the hierarchy. I think the only
>
> difference
>
> between our view is the flavor of the interfaces. There are two
>
> flavors
>
> of
>
> the source interface for DataStream and Table source.
>
> *Flavor 1. Table Sources are some wrapper interfaces around
>
> DataStream
>
> source.*
> Following this way, we will reach the design of the current
>
> proposal,
>
> i.e.
>
> each pluggable exposed in the DataStream source will have a
>
> corresponding
>
> TableSource interface counterpart, which are at the Factory level.
>
> Users
>
> will write code like this:
>
> {
>       MyTableSource myTableSource = MyTableSourceFactory.create();
>       myTableSource.setSchema(mySchema);
>       myTableSource.applyFilterPredicate(expression);
>       ...
> }
>
> The good thing for this flavor is that from the SQL / Table's
>
> perspective,
>
> there is a dedicated set of Table oriented interface.
>
> The downsides are:
> A. From the user's perspective, DataStream Source and Table Source
>
> are
>
> just
>
> two different sets of interfaces, regardless of how they are the
>
> same
>
> internally.
> B. The source developers have to develop for those two sets of
>
> interfaces
>
> in order to support both DataStream and Table.
> C. It is not explicit that DataStream can actually share the
>
> pluggable
>
> in
>
> Table / SQL. For example, in order to provide a filter pluggable
>
> with
>
> SQL
>
> expression, users will have to know the actual converter class that
> converts the expression to the filter predicate and construct that
> converter by themselves.
>
> ---------------
>
> *Flavor 2. A TableSource is a DataStream source with a bunch of
>
> pluggables.
>
> No Table specific interfaces at all.*
> Following this way, we will reach another design where you have a
> SourceFactory and a single Pluggable factory for all the table
>
> pluggables.
>
> And users will write something like:
>
> {
>       Deserializer<Row> myTableDeserializer =
> MyTablePluggableFactory.createDeserializer(schema)
>       MySource<Row> mySource = MySourceFactory.create(properties,
> myTableDeserializer);
>
>
>
>
> mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));
>
> }
>
> The good thing for this flavor is that there is just one set of
>
> interface
>
> that works for both Table and DataStream. There is no difference
>
> between
>
> creating a DataStream source and creating a Table source.
>
> DataStream
>
> can
>
> easily reuse the pluggables from the Table sources.
>
> The downside is that Table / SQL won't have a dedicated API for
> optimization. Instead of writing:
>
> if (MyTableSource instanceOf FilterableTableSource) {
>        // Some filter push down logic.
>        MyTableSource.applyPredicate(expression)
> }
>
> One have to write:
>
> if (MySource instanceOf FilterableSource) {
>       // Some filter push down logic.
>
>
>
>
> mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));
>
> }
>
> -------------------------
>
> Just to be clear, I am not saying flavor 2 is necessarily better
>
> than
>
> flavor 1, but I want to make sure flavor 2 is also considered and
> discussed.
>
> Thanks,
>
> Jiangjie (Becket) Qin.
>
> On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <
>
> dwysakow...@apache.org>
>
> wrote:
>
>
> Hi Becket,
>
> I really think we don't have a differing opinions. We might not
>
> see
>
> the
>
> changes in the same way yet. Personally I think of the
>
> DynamicTableSource
>
> as of a factory for a Source implemented for the DataStream API.
>
> The
>
> important fact about the DynamicTableSource and all feature traits
> (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work
>
> with
>
> Table
>
> API concepts such as e.g. Expressions, SQL specific types etc. In
>
> the
>
> end
>
> what the implementation would resemble is (bear in mind I
>
> tremendously
>
> simplified the example, just to show the relation between the two
>
> APIs):
>
> SupportsFilterablePushDown {
>
>     applyFilters(List<ResolvedExpression> filters) {
>
>       this.filters = convertToDataStreamFilters(filters);
>
>     }
>
>     Source createSource() {
>
>           return Source.create()
>
>             .applyFilters(this.filters);
>
>      }
>
> }
>
> or exactly as you said for the computed columns:
>
>
> SupportsComputedColumnsPushDown {
>
>
>
>     applyComputedColumn(ComputedColumnConverter converter) {
>
>       this.deserializationSchema = new DeserializationSchema<Row>
>
> {
>
>         Row deserialize(...) {
>
>           RowData row = format.deserialize(bytes); // original
>
> format,
>
> e.g
>
> json, avro, etc.
>
>           RowData enriched = converter(row)
>
>         }
>
>       }
>
>     }
>
>     Source createSource() {
>
>           return Source.create()
>
>             .withDeserialization(deserializationSchema);
>
>      }
>
> }
>
> So to sum it up again, all those interfaces are factories that
>
> configure
>
> appropriate parts of the DataStream API using Table API concepts.
>
> Finally
>
> to answer you question for particular comparisons:
>
> DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT>
> SupportsFilterablePushDown v.s. FilterableSource
> SupportsProjectablePushDown v.s. ProjectableSource
> SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> ScanTableSource v.s. ChangeLogDeserializer.
>
> pretty much you can think of all on the left as factories for the
>
> right
>
> side, left side works with Table API classes (Expressions,
>
> DataTypes).
>
> I
>
> hope this clarifies it a bit.
>
> Best,
>
> Dawid
> On 24/03/2020 15:03, Becket Qin wrote:
>
> Hey Kurt,
>
> I don't think DataStream should see some SQL specific concepts
>
> such
>
> as
>
> Filtering or ComputedColumn.
>
> Projectable and Filterable seems not necessarily SQL concepts, but
>
> could
>
> be
>
> applicable to DataStream source as well to reduce the network
>
> load.
>
> For
>
> example ORC and Parquet should probably also be readable from
>
> DataStream,
>
> right?
>
> ComputedColumn is not part of the Source, it is an interface
>
> extends
>
> the
>
> Deserializer, which is a pluggable for the Source. From the SQL's
> perspective it has the concept of computed column, but from the
>
> Source
>
> perspective, It is essentially a Deserializer which also converts
>
> the
>
> records internally, assuming we allow some conversion to be
>
> embedded
>
> to
>
> the source in addition to just deserialization.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <imj...@gmail.com> <imj...@gmail.com> 
> <
>
> imj...@gmail.com> wrote:
>
> Thanks Timo for updating the formats section. That would be very
>
> helpful
>
> for changelog supporting (FLIP-105).
>
> I just left 2 minor comment about some method names. In general,
>
> I'm
>
> +1
>
> to
>
> start a voting.
>
>
>
>
> --------------------------------------------------------------------------------------------------
>
> Hi Becket,
>
> I agree we shouldn't duplicate codes, especiall the runtime
> implementations.
> However, the interfaces proposed by FLIP-95 are mainly used during
> optimization (compiling), not runtime.
> I don't think there is much to share for this. Because table/sql
> is declarative, but DataStream is imperative.
> For example, filter push down, DataStream FilterableSource may
>
> allow
>
> to
>
> accept a FilterFunction (which is a black box for the source).
> However, table sources should pick the pushed filter expressions,
>
> some
>
> sources may only support "=", "<", ">" conditions.
> Pushing a FilterFunction doesn't work in table ecosystem. That
>
> means,
>
> the
>
> connectors have to have some table-specific implementations.
>
>
> Best,
> Jark
>
> On Tue, 24 Mar 2020 at 20:41, Kurt Young <ykt...@gmail.com> 
> <ykt...@gmail.com> <
>
> ykt...@gmail.com> wrote:
>
> Hi Becket,
>
> I don't think DataStream should see some SQL specific concepts
>
> such
>
> as
>
> Filtering or ComputedColumn. It's
> better to stay within SQL area and translate to more generic
>
> concept
>
> when
>
> translating to DataStream/Runtime
> layer, such as use MapFunction to represent computed column logic.
>
> Best,
> Kurt
>
>
> On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <becket....@gmail.com> 
> <becket....@gmail.com>
>
> <
>
> becket....@gmail.com> wrote:
>
> Hi Timo and Dawid,
>
> It's really great that we have the same goal. I am actually
>
> wondering
>
> if
>
> we
>
> can go one step further to avoid some of the interfaces in Table
>
> as
>
> well.
>
> For example, if we have the FilterableSource, do we still need the
> FilterableTableSource? Should DynamicTableSource just become a
> Source<*Row*,
> SourceSplitT, EnumChkT>?
>
> Can you help me understand a bit more about the reason we need the
> following relational representation / wrapper interfaces v.s. the
> interfaces that we could put to the Source in FLIP-27?
>
> DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT>
> SupportsFilterablePushDown v.s. FilterableSource
> SupportsProjectablePushDown v.s. ProjectableSource
> SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> ScanTableSource v.s. ChangeLogDeserializer.
> LookUpTableSource v.s. LookUpSource
>
> Assuming we have all the interfaces on the right side, do we still
>
> need
>
> the
>
> interfaces on the left side? Note that the interfaces on the right
>
> can
>
> be
>
> used by both DataStream and Table. If we do this, there will only
>
> be
>
> one
>
> set of Source interfaces Table and DataStream, the only difference
>
> is
>
> that
>
> the Source for table will have some specific plugins and
>
> configurations.
>
> An
>
> omnipotent Source can implement all the the above interfaces and
>
> take a
>
> Deserializer that implements both ComputedColumnDeserializer and
> ChangeLogDeserializer.
>
> Would the SQL planner work with that?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
>
>
> On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <
>
> jingsongl...@gmail.com>
>
> <
>
> jingsongl...@gmail.com>
>
> wrote:
>
>
> +1. Thanks Timo for the design doc.
>
> We can also consider @Experimental too. But I am +1 to
>
> @PublicEvolving,
>
> we
>
> should be confident in the current change.
>
> Best,
> Jingsong Lee
>
> On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <twal...@apache.org> 
> <twal...@apache.org>
>
> <
>
> twal...@apache.org>
>
> wrote:
>
> @Becket: We totally agree that we don't need table specific
>
> connectors
>
> during runtime. As Dawid said, the interfaces proposed here are
>
> just
>
> for
>
> communication with the planner. Once the properties (watermarks,
> computed column, filters, projecttion etc.) are negotiated, we can
> configure a regular Flink connector.
>
> E.g. setting the watermark assigner and deserialization schema of
>
> a
>
> Kafka connector.
>
> For better separation of concerns, Flink connectors should not
>
> include
>
> relational interfaces and depend on flink-table. This is the
> responsibility of table source/sink.
>
> @Kurt: I would like to mark them @PublicEvolving already because
>
> we
>
> need
>
> to deprecate the old interfaces as early as possible. We cannot
>
> redirect
>
> to @Internal interfaces. They are not marked @Public, so we can
>
> still
>
> evolve them. But a core design shift should not happen again, it
>
> would
>
> leave a bad impression if we are redesign over and over again.
>
> Instead
>
> we should be confident in the current change.
>
> Regards,
> Timo
>
>
> On 24.03.20 09:20, Dawid Wysakowicz wrote:
>
> Hi Becket,
>
> Answering your question, we have the same intention not to
>
> duplicate
>
> connectors between datastream and table apis. The interfaces
>
> proposed
>
> in
>
> the FLIP are a way to describe relational properties of a source.
>
> The
>
> intention is as you described to translate all of those expressed
>
> as
>
> expressions or other Table specific structures into a DataStream
>
> source.
>
> In other words I think what we are doing here is in line with
>
> what
>
> you
>
> described.
>
> Best,
>
> Dawid
>
> On 24/03/2020 02:23, Becket Qin wrote:
>
> Hi Timo,
>
> Thanks for the proposal. I completely agree that the current
>
> Table
>
> connectors could be simplified quite a bit. I haven't finished
>
> reading
>
> everything, but here are some quick thoughts.
>
> Actually to me the biggest question is why should there be two
>
> different
>
> connector systems for DataStream and Table? What is the
>
> fundamental
>
> reason
>
> that is preventing us from merging them to one?
>
> The basic functionality of a connector is to provide
>
> capabilities
>
> to
>
> do
>
> IO
>
> and Serde. Conceptually, Table connectors should just be
>
> DataStream
>
> connectors that are dealing with Rows. It seems that quite a few
>
> of
>
> the
>
> special connector requirements are just a specific way to do IO
>
> /
>
> Serde.
>
> Taking SupportsFilterPushDown as an example, imagine we have the
>
> following
>
> interface:
>
> interface FilterableSource<PREDICATE> {
>        void applyFilterable(Supplier<PREDICATE> predicate);
> }
>
> And if a ParquetSource would like to support filterable, it will
>
> become:
>
> class ParquetSource implements Source,
>
> FilterableSource(FilterPredicate> {
>
>        ...
> }
>
> For Table, one just need to provide an predicate supplier that
>
> converts
>
> an
>
> Expression to the specified predicate type. This has a few
>
> benefit:
>
> 1. Same unified API for filterable for sources, regardless of
>
> DataStream or
>
> Table.
> 2. The  DataStream users now can also use the
>
> ExpressionToPredicate
>
> supplier if they want to.
>
> To summarize, my main point is that I am wondering if it is
>
> possible
>
> to
>
> have a single set of connector interface for both Table and
>
> DataStream,
>
> rather than having two hierarchies. I am not 100% sure if this
>
> would
>
> work,
>
> but if it works, this would be a huge win from both code
>
> maintenance
>
> and
>
> user experience perspective.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <
> dwysakow...@apache.org>
>
> wrote:
>
>
> Hi Timo,
>
> Thank you for the proposal. I think it is an important
>
> improvement
>
> that
>
> will benefit many parts of the Table API. The proposal looks
>
> really
>
> good
>
> to me and personally I would be comfortable with voting on the
>
> current
>
> state.
>
> Best,
>
> Dawid
>
> On 23/03/2020 18:53, Timo Walther wrote:
>
> Hi everyone,
>
> I received some questions around how the new interfaces play
>
> together
>
> with formats and their factories.
>
> Furthermore, for MySQL or Postgres CDC logs, the format should
>
> be
>
> able
>
> to return a `ChangelogMode`.
>
> Also, I incorporated the feedback around the factory design in
>
> general.
>
> I added a new section `Factory Interfaces` to the design
>
> document.
>
> This should be helpful to understand the big picture and
>
> connecting
>
> the concepts.
>
> Please let me know what you think?
>
> Thanks,
> Timo
>
>
> On 18.03.20 13:43, Timo Walther wrote:
>
> Hi Benchao,
>
> this is a very good question. I will update the FLIP about
>
> this.
>
> The legacy planner will not support the new interfaces. It
>
> will
>
> only
>
> support the old interfaces. With the next release, I think
>
> the
>
> Blink
>
> planner is stable enough to be the default one as well.
>
> Regards,
> Timo
>
> On 18.03.20 08:45, Benchao Li wrote:
>
> Hi Timo,
>
> Thank you and others for the efforts to prepare this FLIP.
>
> The FLIP LGTM generally.
>
> +1 for moving blink data structures to table-common, it's
>
> useful
>
> to
>
> udf too
> in the future.
> A little question is, do we plan to support the new
>
> interfaces
>
> and
>
> data
>
> types in legacy planner?
> Or we only plan to support these new interfaces in blink
>
> planner.
>
> And using primary keys from DDL instead of derived key
>
> information
>
> from
>
> each query is also a good idea,
> we met some use cases where this does not works very well
>
> before.
>
> This FLIP also makes the dependencies of table modules more
>
> clear, I
>
> like
> it very much.
>
> Timo Walther <twal...@apache.org> <twal...@apache.org> <twal...@apache.org> 
> <twal...@apache.org>
>
> 于2020年3月17日周二
>
> 上午1:36写道:
>
> Hi everyone,
>
> I'm happy to present the results of long discussions that
>
> we
>
> had
>
> internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and
>
> many
>
> more
>
> have contributed to this design document.
>
> We would like to propose new long-term table source and
>
> table
>
> sink
>
> interfaces:
>
>
>
>
>
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
>
> This is a requirement for FLIP-105 and finalizing FLIP-32.
>
> The goals of this FLIP are:
>
> - Simplify the current interface architecture:
>          - Merge upsert, retract, and append sinks.
>          - Unify batch and streaming sources.
>          - Unify batch and streaming sinks.
>
> - Allow sources to produce a changelog:
>          - UpsertTableSources have been requested a lot by
>
> users.
>
> Now
>
> is the
> time to open the internal planner capabilities via the new
>
> interfaces.
>
>          - According to FLIP-105, we would like to support
>
> changelogs for
>
> processing formats such as Debezium.
>
> - Don't rely on DataStream API for source and sinks:
>          - According to FLIP-32, the Table API and SQL should
>
> be
>
> independent
> of the DataStream API which is why the `table-common`
>
> module
>
> has
>
> no
>
> dependencies on `flink-streaming-java`.
>          - Source and sink implementations should only depend
>
> on
>
> the
>
> `table-common` module after FLIP-27.
>          - Until FLIP-27 is ready, we still put most of the
>
> interfaces in
>
> `table-common` and strictly separate interfaces that
>
> communicate
>
> with a
> planner and actual runtime reader/writers.
>
> - Implement efficient sources and sinks without planner
>
> dependencies:
>
>          - Make Blink's internal data structures available to
>
> connectors.
>
>          - Introduce stable interfaces for data structures
>
> that
>
> can
>
> be
>
> marked as `@PublicEvolving`.
>          - Only require dependencies on `flink-table-common`
>
> in
>
> the
>
> future
>
> It finalizes the concept of dynamic tables and consideres
>
> how
>
> all
>
> source/sink related classes play together.
>
> We look forward to your feedback.
>
> Regards,
> Timo
>
>
> --
> Best, Jingsong Lee
>
>
>
>
>

Reply via email to