Hi Becket, Regarding to Flavor1 and Flavor2, I want to clarify that user will never use table source like this:
{ MyTableSource myTableSource = MyTableSourceFactory.create(); myTableSource.setSchema(mySchema); myTableSource.applyFilterPredicate(expression); ... } TableFactory and TableSource are not directly exposed to end users, all the methods are called by planner, not users. Users always use DDL or descriptor to register a table, and planner will find the factory and create sources according to the properties. All the optimization are applied automatically, e.g. filter/projection pushdown, users don't need to call `applyFilterPredicate` explicitly. On Wed, 25 Mar 2020 at 09:25, Becket Qin <becket....@gmail.com> wrote: > Hi Timo and Dawid, > > Thanks for the clarification. They really help. You are right that we are > on the same page regarding the hierarchy. I think the only difference > between our view is the flavor of the interfaces. There are two flavors of > the source interface for DataStream and Table source. > > *Flavor 1. Table Sources are some wrapper interfaces around DataStream > source.* > Following this way, we will reach the design of the current proposal, i.e. > each pluggable exposed in the DataStream source will have a corresponding > TableSource interface counterpart, which are at the Factory level. Users > will write code like this: > > { > MyTableSource myTableSource = MyTableSourceFactory.create(); > myTableSource.setSchema(mySchema); > myTableSource.applyFilterPredicate(expression); > ... > } > > The good thing for this flavor is that from the SQL / Table's perspective, > there is a dedicated set of Table oriented interface. > > The downsides are: > A. From the user's perspective, DataStream Source and Table Source are just > two different sets of interfaces, regardless of how they are the same > internally. > B. The source developers have to develop for those two sets of interfaces > in order to support both DataStream and Table. > C. It is not explicit that DataStream can actually share the pluggable in > Table / SQL. For example, in order to provide a filter pluggable with SQL > expression, users will have to know the actual converter class that > converts the expression to the filter predicate and construct that > converter by themselves. > > --------------- > > *Flavor 2. A TableSource is a DataStream source with a bunch of pluggables. > No Table specific interfaces at all.* > Following this way, we will reach another design where you have a > SourceFactory and a single Pluggable factory for all the table pluggables. > And users will write something like: > > { > Deserializer<Row> myTableDeserializer = > MyTablePluggableFactory.createDeserializer(schema) > MySource<Row> mySource = MySourceFactory.create(properties, > myTableDeserializer); > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > } > > The good thing for this flavor is that there is just one set of interface > that works for both Table and DataStream. There is no difference between > creating a DataStream source and creating a Table source. DataStream can > easily reuse the pluggables from the Table sources. > > The downside is that Table / SQL won't have a dedicated API for > optimization. Instead of writing: > > if (MyTableSource instanceOf FilterableTableSource) { > // Some filter push down logic. > MyTableSource.applyPredicate(expression) > } > > One have to write: > > if (MySource instanceOf FilterableSource) { > // Some filter push down logic. > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > } > > ------------------------- > > Just to be clear, I am not saying flavor 2 is necessarily better than > flavor 1, but I want to make sure flavor 2 is also considered and > discussed. > > Thanks, > > Jiangjie (Becket) Qin. > > On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz <dwysakow...@apache.org> > wrote: > > > Hi Becket, > > > > I really think we don't have a differing opinions. We might not see the > > changes in the same way yet. Personally I think of the DynamicTableSource > > as of a factory for a Source implemented for the DataStream API. The > > important fact about the DynamicTableSource and all feature traits > > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work with > Table > > API concepts such as e.g. Expressions, SQL specific types etc. In the end > > what the implementation would resemble is (bear in mind I tremendously > > simplified the example, just to show the relation between the two APIs): > > > > SupportsFilterablePushDown { > > > > applyFilters(List<ResolvedExpression> filters) { > > > > this.filters = convertToDataStreamFilters(filters); > > > > } > > > > Source createSource() { > > > > return Source.create() > > > > .applyFilters(this.filters); > > > > } > > > > } > > > > or exactly as you said for the computed columns: > > > > > > SupportsComputedColumnsPushDown { > > > > > > > > applyComputedColumn(ComputedColumnConverter converter) { > > > > this.deserializationSchema = new DeserializationSchema<Row> { > > > > Row deserialize(...) { > > > > RowData row = format.deserialize(bytes); // original format, e.g > > json, avro, etc. > > > > RowData enriched = converter(row) > > > > } > > > > } > > > > } > > > > Source createSource() { > > > > return Source.create() > > > > .withDeserialization(deserializationSchema); > > > > } > > > > } > > > > So to sum it up again, all those interfaces are factories that configure > > appropriate parts of the DataStream API using Table API concepts. Finally > > to answer you question for particular comparisons: > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > SupportsFilterablePushDown v.s. FilterableSource > > SupportsProjectablePushDown v.s. ProjectableSource > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > ScanTableSource v.s. ChangeLogDeserializer. > > > > pretty much you can think of all on the left as factories for the right > > side, left side works with Table API classes (Expressions, DataTypes). I > > hope this clarifies it a bit. > > > > Best, > > > > Dawid > > On 24/03/2020 15:03, Becket Qin wrote: > > > > Hey Kurt, > > > > I don't think DataStream should see some SQL specific concepts such as > > > > Filtering or ComputedColumn. > > > > Projectable and Filterable seems not necessarily SQL concepts, but could > be > > applicable to DataStream source as well to reduce the network load. For > > example ORC and Parquet should probably also be readable from DataStream, > > right? > > > > ComputedColumn is not part of the Source, it is an interface extends the > > Deserializer, which is a pluggable for the Source. From the SQL's > > perspective it has the concept of computed column, but from the Source > > perspective, It is essentially a Deserializer which also converts the > > records internally, assuming we allow some conversion to be embedded to > > the source in addition to just deserialization. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <imj...@gmail.com> < > imj...@gmail.com> wrote: > > > > > > Thanks Timo for updating the formats section. That would be very helpful > > for changelog supporting (FLIP-105). > > > > I just left 2 minor comment about some method names. In general, I'm +1 > to > > start a voting. > > > > > > > -------------------------------------------------------------------------------------------------- > > > > Hi Becket, > > > > I agree we shouldn't duplicate codes, especiall the runtime > > implementations. > > However, the interfaces proposed by FLIP-95 are mainly used during > > optimization (compiling), not runtime. > > I don't think there is much to share for this. Because table/sql > > is declarative, but DataStream is imperative. > > For example, filter push down, DataStream FilterableSource may allow to > > accept a FilterFunction (which is a black box for the source). > > However, table sources should pick the pushed filter expressions, some > > sources may only support "=", "<", ">" conditions. > > Pushing a FilterFunction doesn't work in table ecosystem. That means, the > > connectors have to have some table-specific implementations. > > > > > > Best, > > Jark > > > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <ykt...@gmail.com> < > ykt...@gmail.com> wrote: > > > > > > Hi Becket, > > > > I don't think DataStream should see some SQL specific concepts such as > > Filtering or ComputedColumn. It's > > better to stay within SQL area and translate to more generic concept when > > translating to DataStream/Runtime > > layer, such as use MapFunction to represent computed column logic. > > > > Best, > > Kurt > > > > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <becket....@gmail.com> < > becket....@gmail.com> wrote: > > > > > > Hi Timo and Dawid, > > > > It's really great that we have the same goal. I am actually wondering > > > > if > > > > we > > > > can go one step further to avoid some of the interfaces in Table as > > > > well. > > > > For example, if we have the FilterableSource, do we still need the > > FilterableTableSource? Should DynamicTableSource just become a > > Source<*Row*, > > SourceSplitT, EnumChkT>? > > > > Can you help me understand a bit more about the reason we need the > > following relational representation / wrapper interfaces v.s. the > > interfaces that we could put to the Source in FLIP-27? > > > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > > SupportsFilterablePushDown v.s. FilterableSource > > SupportsProjectablePushDown v.s. ProjectableSource > > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > > ScanTableSource v.s. ChangeLogDeserializer. > > LookUpTableSource v.s. LookUpSource > > > > Assuming we have all the interfaces on the right side, do we still need > > > > the > > > > interfaces on the left side? Note that the interfaces on the right can > > > > be > > > > used by both DataStream and Table. If we do this, there will only be > > > > one > > > > set of Source interfaces Table and DataStream, the only difference is > > > > that > > > > the Source for table will have some specific plugins and > > > > configurations. > > > > An > > > > omnipotent Source can implement all the the above interfaces and take a > > Deserializer that implements both ComputedColumnDeserializer and > > ChangeLogDeserializer. > > > > Would the SQL planner work with that? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li <jingsongl...@gmail.com> < > jingsongl...@gmail.com> > > wrote: > > > > > > +1. Thanks Timo for the design doc. > > > > We can also consider @Experimental too. But I am +1 to > > > > @PublicEvolving, > > > > we > > > > should be confident in the current change. > > > > Best, > > Jingsong Lee > > > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <twal...@apache.org> < > twal...@apache.org> > > > > wrote: > > > > @Becket: We totally agree that we don't need table specific > > > > connectors > > > > during runtime. As Dawid said, the interfaces proposed here are > > > > just > > > > for > > > > communication with the planner. Once the properties (watermarks, > > computed column, filters, projecttion etc.) are negotiated, we can > > configure a regular Flink connector. > > > > E.g. setting the watermark assigner and deserialization schema of a > > Kafka connector. > > > > For better separation of concerns, Flink connectors should not > > > > include > > > > relational interfaces and depend on flink-table. This is the > > responsibility of table source/sink. > > > > @Kurt: I would like to mark them @PublicEvolving already because we > > > > need > > > > to deprecate the old interfaces as early as possible. We cannot > > > > redirect > > > > to @Internal interfaces. They are not marked @Public, so we can > > > > still > > > > evolve them. But a core design shift should not happen again, it > > > > would > > > > leave a bad impression if we are redesign over and over again. > > > > Instead > > > > we should be confident in the current change. > > > > Regards, > > Timo > > > > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > > > Hi Becket, > > > > Answering your question, we have the same intention not to > > > > duplicate > > > > connectors between datastream and table apis. The interfaces > > > > proposed > > > > in > > > > the FLIP are a way to describe relational properties of a source. > > > > The > > > > intention is as you described to translate all of those expressed > > > > as > > > > expressions or other Table specific structures into a DataStream > > > > source. > > > > In other words I think what we are doing here is in line with > > > > what > > > > you > > > > described. > > > > Best, > > > > Dawid > > > > On 24/03/2020 02:23, Becket Qin wrote: > > > > Hi Timo, > > > > Thanks for the proposal. I completely agree that the current > > > > Table > > > > connectors could be simplified quite a bit. I haven't finished > > > > reading > > > > everything, but here are some quick thoughts. > > > > Actually to me the biggest question is why should there be two > > > > different > > > > connector systems for DataStream and Table? What is the > > > > fundamental > > > > reason > > > > that is preventing us from merging them to one? > > > > The basic functionality of a connector is to provide > > > > capabilities > > > > to > > > > do > > > > IO > > > > and Serde. Conceptually, Table connectors should just be > > > > DataStream > > > > connectors that are dealing with Rows. It seems that quite a few > > > > of > > > > the > > > > special connector requirements are just a specific way to do IO > > > > / > > > > Serde. > > > > Taking SupportsFilterPushDown as an example, imagine we have the > > > > following > > > > interface: > > > > interface FilterableSource<PREDICATE> { > > void applyFilterable(Supplier<PREDICATE> predicate); > > } > > > > And if a ParquetSource would like to support filterable, it will > > > > become: > > > > class ParquetSource implements Source, > > > > FilterableSource(FilterPredicate> { > > > > ... > > } > > > > For Table, one just need to provide an predicate supplier that > > > > converts > > > > an > > > > Expression to the specified predicate type. This has a few > > > > benefit: > > > > 1. Same unified API for filterable for sources, regardless of > > > > DataStream or > > > > Table. > > 2. The DataStream users now can also use the > > > > ExpressionToPredicate > > > > supplier if they want to. > > > > To summarize, my main point is that I am wondering if it is > > > > possible > > > > to > > > > have a single set of connector interface for both Table and > > > > DataStream, > > > > rather than having two hierarchies. I am not 100% sure if this > > > > would > > > > work, > > > > but if it works, this would be a huge win from both code > > > > maintenance > > > > and > > > > user experience perspective. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > > > > dwysakow...@apache.org> > > > > wrote: > > > > > > Hi Timo, > > > > Thank you for the proposal. I think it is an important > > > > improvement > > > > that > > > > will benefit many parts of the Table API. The proposal looks > > > > really > > > > good > > > > to me and personally I would be comfortable with voting on the > > > > current > > > > state. > > > > Best, > > > > Dawid > > > > On 23/03/2020 18:53, Timo Walther wrote: > > > > Hi everyone, > > > > I received some questions around how the new interfaces play > > > > together > > > > with formats and their factories. > > > > Furthermore, for MySQL or Postgres CDC logs, the format should > > > > be > > > > able > > > > to return a `ChangelogMode`. > > > > Also, I incorporated the feedback around the factory design in > > > > general. > > > > I added a new section `Factory Interfaces` to the design > > > > document. > > > > This should be helpful to understand the big picture and > > > > connecting > > > > the concepts. > > > > Please let me know what you think? > > > > Thanks, > > Timo > > > > > > On 18.03.20 13:43, Timo Walther wrote: > > > > Hi Benchao, > > > > this is a very good question. I will update the FLIP about > > > > this. > > > > The legacy planner will not support the new interfaces. It > > > > will > > > > only > > > > support the old interfaces. With the next release, I think > > > > the > > > > Blink > > > > planner is stable enough to be the default one as well. > > > > Regards, > > Timo > > > > On 18.03.20 08:45, Benchao Li wrote: > > > > Hi Timo, > > > > Thank you and others for the efforts to prepare this FLIP. > > > > The FLIP LGTM generally. > > > > +1 for moving blink data structures to table-common, it's > > > > useful > > > > to > > > > udf too > > in the future. > > A little question is, do we plan to support the new > > > > interfaces > > > > and > > > > data > > > > types in legacy planner? > > Or we only plan to support these new interfaces in blink > > > > planner. > > > > And using primary keys from DDL instead of derived key > > > > information > > > > from > > > > each query is also a good idea, > > we met some use cases where this does not works very well > > > > before. > > > > This FLIP also makes the dependencies of table modules more > > > > clear, I > > > > like > > it very much. > > > > Timo Walther <twal...@apache.org> <twal...@apache.org> 于2020年3月17日周二 > 上午1:36写道: > > > > > > Hi everyone, > > > > I'm happy to present the results of long discussions that > > > > we > > > > had > > > > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > > > many > > > > more > > > > have contributed to this design document. > > > > We would like to propose new long-term table source and > > > > table > > > > sink > > > > interfaces: > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > > > The goals of this FLIP are: > > > > - Simplify the current interface architecture: > > - Merge upsert, retract, and append sinks. > > - Unify batch and streaming sources. > > - Unify batch and streaming sinks. > > > > - Allow sources to produce a changelog: > > - UpsertTableSources have been requested a lot by > > > > users. > > > > Now > > > > is the > > time to open the internal planner capabilities via the new > > > > interfaces. > > > > - According to FLIP-105, we would like to support > > > > changelogs for > > > > processing formats such as Debezium. > > > > - Don't rely on DataStream API for source and sinks: > > - According to FLIP-32, the Table API and SQL should > > > > be > > > > independent > > of the DataStream API which is why the `table-common` > > > > module > > > > has > > > > no > > > > dependencies on `flink-streaming-java`. > > - Source and sink implementations should only depend > > > > on > > > > the > > > > `table-common` module after FLIP-27. > > - Until FLIP-27 is ready, we still put most of the > > > > interfaces in > > > > `table-common` and strictly separate interfaces that > > > > communicate > > > > with a > > planner and actual runtime reader/writers. > > > > - Implement efficient sources and sinks without planner > > > > dependencies: > > > > - Make Blink's internal data structures available to > > > > connectors. > > > > - Introduce stable interfaces for data structures > > > > that > > > > can > > > > be > > > > marked as `@PublicEvolving`. > > - Only require dependencies on `flink-table-common` > > > > in > > > > the > > > > future > > > > It finalizes the concept of dynamic tables and consideres > > > > how > > > > all > > > > source/sink related classes play together. > > > > We look forward to your feedback. > > > > Regards, > > Timo > > > > > > -- > > Best, Jingsong Lee > > > > > > >