Hi Becket,

Regarding to Flavor1 and Flavor2, I want to clarify that user will never
use table source like this:

{
   MyTableSource myTableSource = MyTableSourceFactory.create();
   myTableSource.setSchema(mySchema);
   myTableSource.applyFilterPredicate(expression);
   ...
}

TableFactory and TableSource are not directly exposed to end users, all the
methods are called by planner, not users.
Users always use DDL or descriptor to register a table, and planner will
find the factory and create sources according to the properties.
All the optimization are applied automatically, e.g. filter/projection
pushdown, users don't need to call `applyFilterPredicate` explicitly.



On Wed, 25 Mar 2020 at 09:25, Becket Qin <becket....@gmail.com> wrote:

> Hi Timo and Dawid,
>
> Thanks for the clarification. They really help. You are right that we are
> on the same page regarding the hierarchy. I think the only difference
> between our view is the flavor of the interfaces. There are two flavors of
> the source interface for DataStream and Table source.
>
> *Flavor 1. Table Sources are some wrapper interfaces around DataStream
> source.*
> Following this way, we will reach the design of the current proposal, i.e.
> each pluggable exposed in the DataStream source will have a corresponding
> TableSource interface counterpart, which are at the Factory level. Users
> will write code like this:
>
> {
>     MyTableSource myTableSource = MyTableSourceFactory.create();
>     myTableSource.setSchema(mySchema);
>     myTableSource.applyFilterPredicate(expression);
>     ...
> }
>
> The good thing for this flavor is that from the SQL / Table's perspective,
> there is a dedicated set of Table oriented interface.
>
> The downsides are:
> A. From the user's perspective, DataStream Source and Table Source are just
> two different sets of interfaces, regardless of how they are the same
> internally.
> B. The source developers have to develop for those two sets of interfaces
> in order to support both DataStream and Table.
> C. It is not explicit that DataStream can actually share the pluggable in
> Table / SQL. For example, in order to provide a filter pluggable with SQL
> expression, users will have to know the actual converter class that
> converts the expression to the filter predicate and construct that
> converter by themselves.
>
> ---------------
>
> *Flavor 2. A TableSource is a DataStream source with a bunch of pluggables.
> No Table specific interfaces at all.*
> Following this way, we will reach another design where you have a
> SourceFactory and a single Pluggable factory for all the table pluggables.
> And users will write something like:
>
> {
>     Deserializer<Row> myTableDeserializer =
> MyTablePluggableFactory.createDeserializer(schema)
>     MySource<Row> mySource = MySourceFactory.create(properties,
> myTableDeserializer);
>
>
> mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));
> }
>
> The good thing for this flavor is that there is just one set of interface
> that works for both Table and DataStream. There is no difference between
> creating a DataStream source and creating a Table source. DataStream can
> easily reuse the pluggables from the Table sources.
>
> The downside is that Table / SQL won't have a dedicated API for
> optimization. Instead of writing:
>
> if (MyTableSource instanceOf FilterableTableSource) {
>      // Some filter push down logic.
>      MyTableSource.applyPredicate(expression)
> }
>
> One have to write:
>
> if (MySource instanceOf FilterableSource) {
>     // Some filter push down logic.
>
>
> mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression));
> }
>
> -------------------------
>
> Just to be clear, I am not saying flavor 2 is necessarily better than
> flavor 1, but I want to make sure flavor 2 is also considered and
> discussed.
>
> Thanks,
>
> Jiangjie (Becket) Qin.
>
> On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <dwysakow...@apache.org>
> wrote:
>
> > Hi Becket,
> >
> > I really think we don't have a differing opinions. We might not see the
> > changes in the same way yet. Personally I think of the DynamicTableSource
> > as of a factory for a Source implemented for the DataStream API. The
> > important fact about the DynamicTableSource and all feature traits
> > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with
> Table
> > API concepts such as e.g. Expressions, SQL specific types etc. In the end
> > what the implementation would resemble is (bear in mind I tremendously
> > simplified the example, just to show the relation between the two APIs):
> >
> > SupportsFilterablePushDown {
> >
> >   applyFilters(List<ResolvedExpression> filters) {
> >
> >     this.filters = convertToDataStreamFilters(filters);
> >
> >   }
> >
> >   Source createSource() {
> >
> >         return Source.create()
> >
> >           .applyFilters(this.filters);
> >
> >    }
> >
> > }
> >
> > or exactly as you said for the computed columns:
> >
> >
> > SupportsComputedColumnsPushDown {
> >
> >
> >
> >   applyComputedColumn(ComputedColumnConverter converter) {
> >
> >     this.deserializationSchema = new DeserializationSchema<Row> {
> >
> >       Row deserialize(...) {
> >
> >         RowData row = format.deserialize(bytes); // original format, e.g
> > json, avro, etc.
> >
> >         RowData enriched = converter(row)
> >
> >       }
> >
> >     }
> >
> >   }
> >
> >   Source createSource() {
> >
> >         return Source.create()
> >
> >           .withDeserialization(deserializationSchema);
> >
> >    }
> >
> > }
> >
> > So to sum it up again, all those interfaces are factories that configure
> > appropriate parts of the DataStream API using Table API concepts. Finally
> > to answer you question for particular comparisons:
> >
> > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT>
> > SupportsFilterablePushDown v.s. FilterableSource
> > SupportsProjectablePushDown v.s. ProjectableSource
> > SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> > ScanTableSource v.s. ChangeLogDeserializer.
> >
> > pretty much you can think of all on the left as factories for the right
> > side, left side works with Table API classes (Expressions, DataTypes). I
> > hope this clarifies it a bit.
> >
> > Best,
> >
> > Dawid
> > On 24/03/2020 15:03, Becket Qin wrote:
> >
> > Hey Kurt,
> >
> > I don't think DataStream should see some SQL specific concepts such as
> >
> > Filtering or ComputedColumn.
> >
> > Projectable and Filterable seems not necessarily SQL concepts, but could
> be
> > applicable to DataStream source as well to reduce the network load. For
> > example ORC and Parquet should probably also be readable from DataStream,
> > right?
> >
> > ComputedColumn is not part of the Source, it is an interface extends the
> > Deserializer, which is a pluggable for the Source. From the SQL's
> > perspective it has the concept of computed column, but from the Source
> > perspective, It is essentially a Deserializer which also converts the
> > records internally, assuming we allow some conversion to be embedded to
> > the source in addition to just deserialization.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <imj...@gmail.com> <
> imj...@gmail.com> wrote:
> >
> >
> > Thanks Timo for updating the formats section. That would be very helpful
> > for changelog supporting (FLIP-105).
> >
> > I just left 2 minor comment about some method names. In general, I'm +1
> to
> > start a voting.
> >
> >
> >
> --------------------------------------------------------------------------------------------------
> >
> > Hi Becket,
> >
> > I agree we shouldn't duplicate codes, especiall the runtime
> > implementations.
> > However, the interfaces proposed by FLIP-95 are mainly used during
> > optimization (compiling), not runtime.
> > I don't think there is much to share for this. Because table/sql
> > is declarative, but DataStream is imperative.
> > For example, filter push down, DataStream FilterableSource may allow to
> > accept a FilterFunction (which is a black box for the source).
> > However, table sources should pick the pushed filter expressions, some
> > sources may only support "=", "<", ">" conditions.
> > Pushing a FilterFunction doesn't work in table ecosystem. That means, the
> > connectors have to have some table-specific implementations.
> >
> >
> > Best,
> > Jark
> >
> > On Tue, 24 Mar 2020 at 20:41, Kurt Young <ykt...@gmail.com> <
> ykt...@gmail.com> wrote:
> >
> >
> > Hi Becket,
> >
> > I don't think DataStream should see some SQL specific concepts such as
> > Filtering or ComputedColumn. It's
> > better to stay within SQL area and translate to more generic concept when
> > translating to DataStream/Runtime
> > layer, such as use MapFunction to represent computed column logic.
> >
> > Best,
> > Kurt
> >
> >
> > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <becket....@gmail.com> <
> becket....@gmail.com> wrote:
> >
> >
> > Hi Timo and Dawid,
> >
> > It's really great that we have the same goal. I am actually wondering
> >
> > if
> >
> > we
> >
> > can go one step further to avoid some of the interfaces in Table as
> >
> > well.
> >
> > For example, if we have the FilterableSource, do we still need the
> > FilterableTableSource? Should DynamicTableSource just become a
> > Source<*Row*,
> > SourceSplitT, EnumChkT>?
> >
> > Can you help me understand a bit more about the reason we need the
> > following relational representation / wrapper interfaces v.s. the
> > interfaces that we could put to the Source in FLIP-27?
> >
> > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT>
> > SupportsFilterablePushDown v.s. FilterableSource
> > SupportsProjectablePushDown v.s. ProjectableSource
> > SupportsWatermarkPushDown v.s. WithWatermarkAssigner
> > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer
> > ScanTableSource v.s. ChangeLogDeserializer.
> > LookUpTableSource v.s. LookUpSource
> >
> > Assuming we have all the interfaces on the right side, do we still need
> >
> > the
> >
> > interfaces on the left side? Note that the interfaces on the right can
> >
> > be
> >
> > used by both DataStream and Table. If we do this, there will only be
> >
> > one
> >
> > set of Source interfaces Table and DataStream, the only difference is
> >
> > that
> >
> > the Source for table will have some specific plugins and
> >
> > configurations.
> >
> > An
> >
> > omnipotent Source can implement all the the above interfaces and take a
> > Deserializer that implements both ComputedColumnDeserializer and
> > ChangeLogDeserializer.
> >
> > Would the SQL planner work with that?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> >
> >
> >
> > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <jingsongl...@gmail.com> <
> jingsongl...@gmail.com>
> > wrote:
> >
> >
> > +1. Thanks Timo for the design doc.
> >
> > We can also consider @Experimental too. But I am +1 to
> >
> > @PublicEvolving,
> >
> > we
> >
> > should be confident in the current change.
> >
> > Best,
> > Jingsong Lee
> >
> > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <twal...@apache.org> <
> twal...@apache.org>
> >
> > wrote:
> >
> > @Becket: We totally agree that we don't need table specific
> >
> > connectors
> >
> > during runtime. As Dawid said, the interfaces proposed here are
> >
> > just
> >
> > for
> >
> > communication with the planner. Once the properties (watermarks,
> > computed column, filters, projecttion etc.) are negotiated, we can
> > configure a regular Flink connector.
> >
> > E.g. setting the watermark assigner and deserialization schema of a
> > Kafka connector.
> >
> > For better separation of concerns, Flink connectors should not
> >
> > include
> >
> > relational interfaces and depend on flink-table. This is the
> > responsibility of table source/sink.
> >
> > @Kurt: I would like to mark them @PublicEvolving already because we
> >
> > need
> >
> > to deprecate the old interfaces as early as possible. We cannot
> >
> > redirect
> >
> > to @Internal interfaces. They are not marked @Public, so we can
> >
> > still
> >
> > evolve them. But a core design shift should not happen again, it
> >
> > would
> >
> > leave a bad impression if we are redesign over and over again.
> >
> > Instead
> >
> > we should be confident in the current change.
> >
> > Regards,
> > Timo
> >
> >
> > On 24.03.20 09:20, Dawid Wysakowicz wrote:
> >
> > Hi Becket,
> >
> > Answering your question, we have the same intention not to
> >
> > duplicate
> >
> > connectors between datastream and table apis. The interfaces
> >
> > proposed
> >
> > in
> >
> > the FLIP are a way to describe relational properties of a source.
> >
> > The
> >
> > intention is as you described to translate all of those expressed
> >
> > as
> >
> > expressions or other Table specific structures into a DataStream
> >
> > source.
> >
> > In other words I think what we are doing here is in line with
> >
> > what
> >
> > you
> >
> > described.
> >
> > Best,
> >
> > Dawid
> >
> > On 24/03/2020 02:23, Becket Qin wrote:
> >
> > Hi Timo,
> >
> > Thanks for the proposal. I completely agree that the current
> >
> > Table
> >
> > connectors could be simplified quite a bit. I haven't finished
> >
> > reading
> >
> > everything, but here are some quick thoughts.
> >
> > Actually to me the biggest question is why should there be two
> >
> > different
> >
> > connector systems for DataStream and Table? What is the
> >
> > fundamental
> >
> > reason
> >
> > that is preventing us from merging them to one?
> >
> > The basic functionality of a connector is to provide
> >
> > capabilities
> >
> > to
> >
> > do
> >
> > IO
> >
> > and Serde. Conceptually, Table connectors should just be
> >
> > DataStream
> >
> > connectors that are dealing with Rows. It seems that quite a few
> >
> > of
> >
> > the
> >
> > special connector requirements are just a specific way to do IO
> >
> > /
> >
> > Serde.
> >
> > Taking SupportsFilterPushDown as an example, imagine we have the
> >
> > following
> >
> > interface:
> >
> > interface FilterableSource<PREDICATE> {
> >      void applyFilterable(Supplier<PREDICATE> predicate);
> > }
> >
> > And if a ParquetSource would like to support filterable, it will
> >
> > become:
> >
> > class ParquetSource implements Source,
> >
> > FilterableSource(FilterPredicate> {
> >
> >      ...
> > }
> >
> > For Table, one just need to provide an predicate supplier that
> >
> > converts
> >
> > an
> >
> > Expression to the specified predicate type. This has a few
> >
> > benefit:
> >
> > 1. Same unified API for filterable for sources, regardless of
> >
> > DataStream or
> >
> > Table.
> > 2. The  DataStream users now can also use the
> >
> > ExpressionToPredicate
> >
> > supplier if they want to.
> >
> > To summarize, my main point is that I am wondering if it is
> >
> > possible
> >
> > to
> >
> > have a single set of connector interface for both Table and
> >
> > DataStream,
> >
> > rather than having two hierarchies. I am not 100% sure if this
> >
> > would
> >
> > work,
> >
> > but if it works, this would be a huge win from both code
> >
> > maintenance
> >
> > and
> >
> > user experience perspective.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz <
> >
> > dwysakow...@apache.org>
> >
> > wrote:
> >
> >
> > Hi Timo,
> >
> > Thank you for the proposal. I think it is an important
> >
> > improvement
> >
> > that
> >
> > will benefit many parts of the Table API. The proposal looks
> >
> > really
> >
> > good
> >
> > to me and personally I would be comfortable with voting on the
> >
> > current
> >
> > state.
> >
> > Best,
> >
> > Dawid
> >
> > On 23/03/2020 18:53, Timo Walther wrote:
> >
> > Hi everyone,
> >
> > I received some questions around how the new interfaces play
> >
> > together
> >
> > with formats and their factories.
> >
> > Furthermore, for MySQL or Postgres CDC logs, the format should
> >
> > be
> >
> > able
> >
> > to return a `ChangelogMode`.
> >
> > Also, I incorporated the feedback around the factory design in
> >
> > general.
> >
> > I added a new section `Factory Interfaces` to the design
> >
> > document.
> >
> > This should be helpful to understand the big picture and
> >
> > connecting
> >
> > the concepts.
> >
> > Please let me know what you think?
> >
> > Thanks,
> > Timo
> >
> >
> > On 18.03.20 13:43, Timo Walther wrote:
> >
> > Hi Benchao,
> >
> > this is a very good question. I will update the FLIP about
> >
> > this.
> >
> > The legacy planner will not support the new interfaces. It
> >
> > will
> >
> > only
> >
> > support the old interfaces. With the next release, I think
> >
> > the
> >
> > Blink
> >
> > planner is stable enough to be the default one as well.
> >
> > Regards,
> > Timo
> >
> > On 18.03.20 08:45, Benchao Li wrote:
> >
> > Hi Timo,
> >
> > Thank you and others for the efforts to prepare this FLIP.
> >
> > The FLIP LGTM generally.
> >
> > +1 for moving blink data structures to table-common, it's
> >
> > useful
> >
> > to
> >
> > udf too
> > in the future.
> > A little question is, do we plan to support the new
> >
> > interfaces
> >
> > and
> >
> > data
> >
> > types in legacy planner?
> > Or we only plan to support these new interfaces in blink
> >
> > planner.
> >
> > And using primary keys from DDL instead of derived key
> >
> > information
> >
> > from
> >
> > each query is also a good idea,
> > we met some use cases where this does not works very well
> >
> > before.
> >
> > This FLIP also makes the dependencies of table modules more
> >
> > clear, I
> >
> > like
> > it very much.
> >
> > Timo Walther <twal...@apache.org> <twal...@apache.org> 于2020年3月17日周二
> 上午1:36写道:
> >
> >
> > Hi everyone,
> >
> > I'm happy to present the results of long discussions that
> >
> > we
> >
> > had
> >
> > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and
> >
> > many
> >
> > more
> >
> > have contributed to this design document.
> >
> > We would like to propose new long-term table source and
> >
> > table
> >
> > sink
> >
> > interfaces:
> >
> >
> >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
> >
> > This is a requirement for FLIP-105 and finalizing FLIP-32.
> >
> > The goals of this FLIP are:
> >
> > - Simplify the current interface architecture:
> >        - Merge upsert, retract, and append sinks.
> >        - Unify batch and streaming sources.
> >        - Unify batch and streaming sinks.
> >
> > - Allow sources to produce a changelog:
> >        - UpsertTableSources have been requested a lot by
> >
> > users.
> >
> > Now
> >
> > is the
> > time to open the internal planner capabilities via the new
> >
> > interfaces.
> >
> >        - According to FLIP-105, we would like to support
> >
> > changelogs for
> >
> > processing formats such as Debezium.
> >
> > - Don't rely on DataStream API for source and sinks:
> >        - According to FLIP-32, the Table API and SQL should
> >
> > be
> >
> > independent
> > of the DataStream API which is why the `table-common`
> >
> > module
> >
> > has
> >
> > no
> >
> > dependencies on `flink-streaming-java`.
> >        - Source and sink implementations should only depend
> >
> > on
> >
> > the
> >
> > `table-common` module after FLIP-27.
> >        - Until FLIP-27 is ready, we still put most of the
> >
> > interfaces in
> >
> > `table-common` and strictly separate interfaces that
> >
> > communicate
> >
> > with a
> > planner and actual runtime reader/writers.
> >
> > - Implement efficient sources and sinks without planner
> >
> > dependencies:
> >
> >        - Make Blink's internal data structures available to
> >
> > connectors.
> >
> >        - Introduce stable interfaces for data structures
> >
> > that
> >
> > can
> >
> > be
> >
> > marked as `@PublicEvolving`.
> >        - Only require dependencies on `flink-table-common`
> >
> > in
> >
> > the
> >
> > future
> >
> > It finalizes the concept of dynamic tables and consideres
> >
> > how
> >
> > all
> >
> > source/sink related classes play together.
> >
> > We look forward to your feedback.
> >
> > Regards,
> > Timo
> >
> >
> > --
> > Best, Jingsong Lee
> >
> >
> >
>

Reply via email to