Hi Timo and Dawid, Thanks for the patient explanation. I just had a phone call with Kurt and Jark. I do see there are a few abstractions that we only see the use case in SQL so far. Therefore while thinking of a Source abstraction that may be shared with different use cases semantics is theoretically useful, doing that may not bring us much value at this point. So I am convinced that it doesn't have to be done right now and I have no further concern with the design in the current FLIP.
Again, really appreciate the patient discussion! I learned quite a bit from it. Cheers, Jiangjie (Becket) Qin On Thu, Mar 26, 2020 at 8:58 PM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi Becket, > > Generally I don't think connector developers should bother with > understanding any of the SQL concepts. > > I am not sure if we understand "connector developer" the same way. Let me > describe how I see the process of writing a new source (that can be used in > both Table & DataStream API) > > 1. Connector developer writes a Source that deals with the actual reading > and deserializing (preferably with a pluggable format/deserializer). The > result of that step should be something like: > > FilesystemSource > > .path(...) > > .format(ParquetFormat > > .filterPredicate(/* parquet specific filter */) > > .project(/* parquet specific projection */) > > .map(...)) > > .watermarkAssigner(...) > > > This is useful for DataStream and we can and want to use this in the Table > API. Those interface shouldn't accept any *Translators though. It does > make no sense cause internally they are not dealing e.g. with the > Expression. They should accept already created predicates. > > We are not designing anything at that level. This we expect from FLIP-27 > > 2. Then we need to have a DynamicTableSource with different abilities that > can create e.g. the parquet filter or projection from expressions. I think > this is what you also describe in your second point. And this is what we > are designing in the FLIP. Bear in mind that e.g. Deserializer will be > created out of multiple SQL concepts: regular schema/computed > columns/possibly projections etc., each applied at different planning > stages. > > All of those interfaces serve the purpose of configuring the > DynamicTableSource so that it is able to instantiate the Source with proper > configuration. In other words it is a factory for the source that you can > configure with SQL concepts. In turn this Factory will call another factory > from point 1. > > I don't see a potential for unifying factories across different high level > APIs. Taking your example with Spatial Database that operates on > Coordinates and Area (even though those would rather be modeled as SQL > types and we would still operate on Rows, but just for the sake of the > example). In that respect there is no point in having a > PushDownComputedColumns interface in the factory for the spatial database. > > Best, > > Dawid > > > On 26/03/2020 11:47, Becket Qin wrote: > > Hi Timo, > > Regarding "connector developers just need to know how to write an > > ExpressionToParquetFilter": > > > This is the entire purpose of the DynamicTableSource/DynamicTableSink. > > The bridging between SQL concepts and connector specific concepts. > Because this is the tricky part. How to get from a SQL concept to a > connctor concept. > > Maybe it is just a naming issue depending on whether one is looking upward > from the Connectors perspective, or looking downward from the SQL > perspective. If we agree that the connectors should provide semantic free > API to the high level use cases, it seems we should follow the former path. > And if there are one or two APIs that the connector developers have to > understand in order to support Table / SQL, I think we can just address > them case by case, instead of wrapping the entire low level source API > with a set of new concepts. > > Correct me if I am wrong, can we tell the following story to a connector > developer and get a all the TableSource functionality work? > > To provide a TableSource from a Source, one just need to know two more > concepts: *Row* and *Expression*. The work to create a TableSource are > following: > 1. A connector developer can write three classes in order to build a table > source: > > - Deserializer<Row> (Must-have) > - PredicateTranslator<Expression, FilterPredicate> (optional, only > applicable if the Source is a FilterableSource) > - PredicateTranslator<Expression, ProjectionPredicate> (optional, only > applicable if the Source is a ProjectableSource) > > 2. In order to let the table source be discoverable, one need to provide a > Factory, and that Factory provides the following as a bundle: > > - The Source itself (Must-have) > - The Deserializer<Row> (Must-have) > - PredicateTranslator<Expression, FilterPredicate> (optional, only > applicable when the Factory is a FilterFactory) > - PredicateTranslator<Expression, ProjectionPredicate> (optional, only > applicable when the Factory is a ProjectorFactory) > > 3. The Deserializer<Row> may implement one more decorative interfaces to > further convert the record after deserialization. > > - withMapFunction<Row, Row>; > > Note that the above description only require the connector developer to > understand Expression and Row. If this works, It is much easier to explain > than throwing a full set of new concepts. More importantly, it is way more > generic. For example, If we change Row to Coordinates, and Expression to > Area, we easily get a Source for a Spatial Database. > > > One thing I want to call out is that while the old SourceFunction and > InputFormat are concrete implementations that does the actual IO work. The > Source API in FLIP-27 itself is kind of a Factory by itself already. So if > we can push the decorative interfaces from the TableFactory layer to the > Source layer, it will help unify the experience for DataStream and Table > Source. This will also align with our goal of letting the DataStream Source > provide a semantic free API that can be used by different high level API. > > > BTW, Jark suggested that we can probably have an offline call to accelerate > the discussion. I think it is a good idea. Can we do that? > > Thanks, > > Jiangjie (Becket) Qin > > > On Thu, Mar 26, 2020 at 5:28 PM Timo Walther <twal...@apache.org> > <twal...@apache.org> wrote: > > > Hi Becket, > > Regarding "PushDown/NestedPushDown which is internal to optimizer": > > Those concepts cannot be entirely internal to the optimizer, at some > point the optimizer needs to pass them into the connector specific code. > This code will then convert it to e.g. Parque expressions. So there must > be some interface that takes SQL Expression and converts to connector > specific code. This interface between planner and connector is modelled > by the SupportsXXX interfaces. And you are right, if developers don't > care, they don't need to implement those optional interfaces but will > not get performant connectors. > > Regarding "Table connector can work with the above two mechanism": > > A table connector needs three mechanisms that are represented in the > current design. > > 1. a stateless discovery interface (Factory) that can convert > ConfigOptions to a stateful factory interface > (DynamicTableSource/DynamicTableSink) > > 2. a stateful factory interface (DynamicTableSource/DynamicTableSink) > that receives concepts from the optimizer (watermarks, filters, > projections) and produces runtime classes such as your > `ExpressionToParquetFilter` > > 3. runtime interfaces that are generated from the stateful factory; all > the factories that you mentioned can be used in `getScanRuntimeProvider`. > > Regarding "connector developers just need to know how to write an > ExpressionToParquetFilter": > > This is the entire purpose of the DynamicTableSource/DynamicTableSink. > The bridging between SQL concepts and connector specific concepts. > Because this is the tricky part. How to get from a SQL concept to a > connctor concept. > > Regards, > Timo > > > On 26.03.20 04:46, Becket Qin wrote: > > Hi Timo, > > Thanks for the reply. I totally agree that there must be something new > added to the connector in order to make it work for SQL / Table. My > > concern > > is mostly over what they should be, and how to add them. To be honest, I > was kind of lost when looking at the interfaces such as > DataStructureConverter, RuntimeConverter and their internal context. > > Also I > > believe most connector developers do not care about the concept of > "PushDown" / "NestedPushDown" which is internal to optimizer and not even > exposed to SQL writers. > > Therefore I am trying to see if we can: > A) Keep those additions minimum to the connector developers if they don't > have to know the details. > B) Expose as less high level concept as possible. More specifically, try > > to > > speak the connector language and expose the general mechanism instead of > binding them with use case semantic. > > If we can achieve the above two goals, we could avoid adding unnecessary > burden to the connector developers, and also make the connectors more > generic. > > It might worth thinking about what additional work is necessary for the > connector developers, here are what I am thinking of, please correct me > > if > > I miss something. > > 1. A Factory interface that allows high level use case, in this case > SQL, to find a matching source using service provider mechanism. > 2. Allows the high level use case to specify the plugins that are > supported by the underneath DataStream Source. > > If Table connector can work with the above two mechanism, maybe we can > > make > > some slight modifications to the interfaces in the current FLIP. > > - A *SourceFactory* which extends the Factory interface in the FLIP, > with one more method: > - *Source getSource();* > - Some decorative interfaces to the SourceFactory such as: > - *FilterFactory<PREDICATE, T extends Supplier<PREDICATE>>*, with > > the > > following method > - T getFilter(); > - *ProjectorFactory<PREDICATE, T extends Supplier<PREDICATE>>*, > > with > > the following method. > - T getProjector(); > - *DeserializerFactory<INPUT, OUTPUT>* > > With this set of API, a ParquetTableSourceFactory may become: > > class ParqeutTableSourceFactory implements > SourceFactory, > DeserializerFactory<ParquetRecords, Row>, > FilterFactory<ParquetFilter, ExressionToParquetFilter> { > @Override > ParquetSource getSource() { ... } > > @Override > ExressionToParquetFilter getFilterSupplier() { ... }; > } > > The ExressionToParquetFilter will have an *applyPredicate(Expression)* > method. > > I know it does not look like a perfect interface from the pure SQL > perspective. And I am not even sure if this would meet all the > > requirements > > for SQL, but the benefit is that the connector developers just need to > > know > > how to write an ExpressionToParquetFilter in order to make it work for > Table, without having to understand the entire SQL concept. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Wed, Mar 25, 2020 at 5:57 PM Timo Walther <twal...@apache.org> > <twal...@apache.org> wrote: > > > Hi Becket, > > Let me clarify a few things first: Historically we thought of Table > API/SQL as a library on top of DataStream API. Similar to Gelly or CEP. > We used TypeInformation in Table API to integrate nicely with DataStream > API. However, the last years have shown that SQL is not just a library. > It is an entire ecosystem that defines data types, submission behavior, > execution behavior, and highly optimized SerDes. SQL is a way to declare > data processing end-to-end such that the planner has the full control > over the execution. > > But I totally agree with your concerns around connectors. There is no > big difference between your concerns and the current design. > > 1. "native connector interface is a generic abstraction of doing IO and > Serde": > > This is the case in our design. We are using SourceFunction, > DeserializationSchema, WatermarkAssigner, etc. all pluggable interfaces > that the DataStream API offers for performing runtime operations. > > 2. "advanced features ... could be provided in a semantic free way": > > I agree here. But this is an orthogonal topic that each connector > implementer should keep in mind. If a new connector is developed, it > should *not* be developed only for SQL in mind but with good abstraction > such that also DataStream API users can use it. A connector should have > a builder pattern to plugin all capabilities like Parque filters etc. > There should be no table-specific native/runtime connectors. I think > this discussion is related to the discussion of FLIP-115. > > However, as I mentioned before: This FLIP only discusses the interfaces > for communication between planner and connector factory. As Dawid said > earlier, a DynamicTableSource can be more seen as a factory that calls > pluggable interfaces of a native connextor in the end: > > KafkaConnector.builder() > .watermarkAssigner(...) > .keyDeser(...) > .valueDeser(...) > .... > .build() > > Regards, > Timo > > > On 25.03.20 09:05, Becket Qin wrote: > > Hi Kurt, > > I do not object to promote the concepts of SQL, but I don't think we > > should > > do that by introducing a new dedicate set of connector public > > interfaces > > that is only for SQL. The same argument can be applied to Gelly, CEP, > > and > > Machine Learning, claiming that they need to introduce a dedicated > > public > > set of interfaces that fits their own concept and ask the the connector > developers to learn and follow their design. As an analogy, if we want > > to > > promote Chinese, we don't want to force people to learn ancient Chinese > poem while they only need to know a few words like "hello" and > > "goodbye". > > As some design principles, here are what I think what Flink connectors > should look like: > > 1. The native connector interface is a generic abstraction of doing IO > > and > > Serde, without semantic for high level use cases such as SQL, Gelly, > > CEP, > > etc. > > 2. Some advanced features that may help accelerate the IO and Serde > > could > > be provided in the native connector interfaces in a semantic free way > > so > > all the high level use cases can leverage. > > 3. Additional semantics can be built on top of the native source > > interface > > through providing different plugins. These plugins could be high level > > use > > case aware. For example, to provide a filter to the source, we can do > > the > > following > > // An interface for all the filters that take an expression. > interface ExpressionFilter { > FilterResult applyFilterExpression(); > } > > // An filter plugin implementation that translate the SQL Expression > > to a > > ParquetFilterPredicate. > Class ParquetExpressionFilter implements > > Supplier<ParquetFilterPredicate>, > > ExpressionFilter { > // Called by the high level use case, > FilterResult applyFilterExpression() { ... } > > // Used by the native Source interface. > ParquetFilterPredicate get() { ... } > } > > In this case, the connector developer just need to write the logic of > translating an Expression to Parquet FilterPredicate. They don't have > > to > > understand the entire set of interfaces that we want to promote. Just > > like > > they only need to know how to say "Hello" without learning ancient > > Chinese > > poem. > > Again, I am not saying this is necessarily the best approach. But so > > far > > it > > seems a reasonable design principle to tell the developers. > > Thanks, > > Jiangjie (becket) Qin > > > > On Wed, Mar 25, 2020 at 11:53 AM Kurt Young <ykt...@gmail.com> > <ykt...@gmail.com> wrote: > > > Hi Becket, > > I don't think we should discuss this in pure engineering aspects. Your > proposal is trying > to let SQL connector developers understand as less SQL concepts as > possible. But quite > the opposite, we are designing those interfaces to emphasize the SQL > concept, to bridge > high level concepts into real interfaces and classes. > > We keep talking about time-varying relations and dynamic table when > introduce SQL concepts, > sources and sinks are most critical part playing with those concepts. > > It's > > essential to let > Flink SQL developers to learn these concepts and connect them with > > real > > codes by introducing > these connector interfaces and can further write *correct* connectors > > based > > on such domain > knowledge. > > So this FLIP is a very important chance to express these concepts and > > make > > most SQL developers > be align with concepts and on same page. It's mostly for different > > level of > > abstractions and for domains > like SQL, it's becoming more important. It helps Flink SQL go > > smoothly > > in > > the future, and also > make it easier for new contributors. But I would admit this is not > > that > > obvious for others who don't work > with SQL frequently. > > Best, > Kurt > > > On Wed, Mar 25, 2020 at 11:07 AM Becket Qin <becket....@gmail.com> > <becket....@gmail.com> > > wrote: > > Hi Jark, > > It is good to know that we do not expect the end users to touch those > interfaces. > > Then the question boils down to whether the connector developers > > should > > be > > aware of the interfaces that are only used by the SQL optimizer. It > > seems a > > win if we can avoid that. > > Two potential solutions off the top of my head are: > 1. An internal helper class doing the instanceOf based on DataStream > > source > > interface and create pluggables for that DataStream source. > 2. codegen the set of TableSource interfaces given a DataStream > > Source > > and > > its corresponding TablePluggablesFactory. > > Thanks, > > Jiangjie (Becket) Qin > > On Wed, Mar 25, 2020 at 10:07 AM Jark Wu <imj...@gmail.com> > <imj...@gmail.com> wrote: > > > Hi Becket, > > Regarding to Flavor1 and Flavor2, I want to clarify that user will > > never > > use table source like this: > > { > MyTableSource myTableSource = MyTableSourceFactory.create(); > myTableSource.setSchema(mySchema); > myTableSource.applyFilterPredicate(expression); > ... > } > > TableFactory and TableSource are not directly exposed to end users, > > all > > the > > methods are called by planner, not users. > Users always use DDL or descriptor to register a table, and planner > > will > > find the factory and create sources according to the properties. > All the optimization are applied automatically, e.g. > > filter/projection > > pushdown, users don't need to call `applyFilterPredicate` > > explicitly. > > On Wed, 25 Mar 2020 at 09:25, Becket Qin <becket....@gmail.com> > <becket....@gmail.com> > > wrote: > > Hi Timo and Dawid, > > Thanks for the clarification. They really help. You are right that > > we > > are > > on the same page regarding the hierarchy. I think the only > > difference > > between our view is the flavor of the interfaces. There are two > > flavors > > of > > the source interface for DataStream and Table source. > > *Flavor 1. Table Sources are some wrapper interfaces around > > DataStream > > source.* > Following this way, we will reach the design of the current > > proposal, > > i.e. > > each pluggable exposed in the DataStream source will have a > > corresponding > > TableSource interface counterpart, which are at the Factory level. > > Users > > will write code like this: > > { > MyTableSource myTableSource = MyTableSourceFactory.create(); > myTableSource.setSchema(mySchema); > myTableSource.applyFilterPredicate(expression); > ... > } > > The good thing for this flavor is that from the SQL / Table's > > perspective, > > there is a dedicated set of Table oriented interface. > > The downsides are: > A. From the user's perspective, DataStream Source and Table Source > > are > > just > > two different sets of interfaces, regardless of how they are the > > same > > internally. > B. The source developers have to develop for those two sets of > > interfaces > > in order to support both DataStream and Table. > C. It is not explicit that DataStream can actually share the > > pluggable > > in > > Table / SQL. For example, in order to provide a filter pluggable > > with > > SQL > > expression, users will have to know the actual converter class that > converts the expression to the filter predicate and construct that > converter by themselves. > > --------------- > > *Flavor 2. A TableSource is a DataStream source with a bunch of > > pluggables. > > No Table specific interfaces at all.* > Following this way, we will reach another design where you have a > SourceFactory and a single Pluggable factory for all the table > > pluggables. > > And users will write something like: > > { > Deserializer<Row> myTableDeserializer = > MyTablePluggableFactory.createDeserializer(schema) > MySource<Row> mySource = MySourceFactory.create(properties, > myTableDeserializer); > > > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > > } > > The good thing for this flavor is that there is just one set of > > interface > > that works for both Table and DataStream. There is no difference > > between > > creating a DataStream source and creating a Table source. > > DataStream > > can > > easily reuse the pluggables from the Table sources. > > The downside is that Table / SQL won't have a dedicated API for > optimization. Instead of writing: > > if (MyTableSource instanceOf FilterableTableSource) { > // Some filter push down logic. > MyTableSource.applyPredicate(expression) > } > > One have to write: > > if (MySource instanceOf FilterableSource) { > // Some filter push down logic. > > > > > mySource.applyPredicate(MyTablePluggableFactory.createFilterPredicate(expression)); > > } > > ------------------------- > > Just to be clear, I am not saying flavor 2 is necessarily better > > than > > flavor 1, but I want to make sure flavor 2 is also considered and > discussed. > > Thanks, > > Jiangjie (Becket) Qin. > > On Tue, Mar 24, 2020 at 10:53 PM Dawid Wysakowicz < > > dwysakow...@apache.org> > > wrote: > > > Hi Becket, > > I really think we don't have a differing opinions. We might not > > see > > the > > changes in the same way yet. Personally I think of the > > DynamicTableSource > > as of a factory for a Source implemented for the DataStream API. > > The > > important fact about the DynamicTableSource and all feature traits > (SupportsFilterablePushDown, SupportsProjectPushDown etc.) work > > with > > Table > > API concepts such as e.g. Expressions, SQL specific types etc. In > > the > > end > > what the implementation would resemble is (bear in mind I > > tremendously > > simplified the example, just to show the relation between the two > > APIs): > > SupportsFilterablePushDown { > > applyFilters(List<ResolvedExpression> filters) { > > this.filters = convertToDataStreamFilters(filters); > > } > > Source createSource() { > > return Source.create() > > .applyFilters(this.filters); > > } > > } > > or exactly as you said for the computed columns: > > > SupportsComputedColumnsPushDown { > > > > applyComputedColumn(ComputedColumnConverter converter) { > > this.deserializationSchema = new DeserializationSchema<Row> > > { > > Row deserialize(...) { > > RowData row = format.deserialize(bytes); // original > > format, > > e.g > > json, avro, etc. > > RowData enriched = converter(row) > > } > > } > > } > > Source createSource() { > > return Source.create() > > .withDeserialization(deserializationSchema); > > } > > } > > So to sum it up again, all those interfaces are factories that > > configure > > appropriate parts of the DataStream API using Table API concepts. > > Finally > > to answer you question for particular comparisons: > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > SupportsFilterablePushDown v.s. FilterableSource > SupportsProjectablePushDown v.s. ProjectableSource > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > ScanTableSource v.s. ChangeLogDeserializer. > > pretty much you can think of all on the left as factories for the > > right > > side, left side works with Table API classes (Expressions, > > DataTypes). > > I > > hope this clarifies it a bit. > > Best, > > Dawid > On 24/03/2020 15:03, Becket Qin wrote: > > Hey Kurt, > > I don't think DataStream should see some SQL specific concepts > > such > > as > > Filtering or ComputedColumn. > > Projectable and Filterable seems not necessarily SQL concepts, but > > could > > be > > applicable to DataStream source as well to reduce the network > > load. > > For > > example ORC and Parquet should probably also be readable from > > DataStream, > > right? > > ComputedColumn is not part of the Source, it is an interface > > extends > > the > > Deserializer, which is a pluggable for the Source. From the SQL's > perspective it has the concept of computed column, but from the > > Source > > perspective, It is essentially a Deserializer which also converts > > the > > records internally, assuming we allow some conversion to be > > embedded > > to > > the source in addition to just deserialization. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Mar 24, 2020 at 9:36 PM Jark Wu <imj...@gmail.com> <imj...@gmail.com> > < > > imj...@gmail.com> wrote: > > Thanks Timo for updating the formats section. That would be very > > helpful > > for changelog supporting (FLIP-105). > > I just left 2 minor comment about some method names. In general, > > I'm > > +1 > > to > > start a voting. > > > > > -------------------------------------------------------------------------------------------------- > > Hi Becket, > > I agree we shouldn't duplicate codes, especiall the runtime > implementations. > However, the interfaces proposed by FLIP-95 are mainly used during > optimization (compiling), not runtime. > I don't think there is much to share for this. Because table/sql > is declarative, but DataStream is imperative. > For example, filter push down, DataStream FilterableSource may > > allow > > to > > accept a FilterFunction (which is a black box for the source). > However, table sources should pick the pushed filter expressions, > > some > > sources may only support "=", "<", ">" conditions. > Pushing a FilterFunction doesn't work in table ecosystem. That > > means, > > the > > connectors have to have some table-specific implementations. > > > Best, > Jark > > On Tue, 24 Mar 2020 at 20:41, Kurt Young <ykt...@gmail.com> > <ykt...@gmail.com> < > > ykt...@gmail.com> wrote: > > Hi Becket, > > I don't think DataStream should see some SQL specific concepts > > such > > as > > Filtering or ComputedColumn. It's > better to stay within SQL area and translate to more generic > > concept > > when > > translating to DataStream/Runtime > layer, such as use MapFunction to represent computed column logic. > > Best, > Kurt > > > On Tue, Mar 24, 2020 at 5:47 PM Becket Qin <becket....@gmail.com> > <becket....@gmail.com> > > < > > becket....@gmail.com> wrote: > > Hi Timo and Dawid, > > It's really great that we have the same goal. I am actually > > wondering > > if > > we > > can go one step further to avoid some of the interfaces in Table > > as > > well. > > For example, if we have the FilterableSource, do we still need the > FilterableTableSource? Should DynamicTableSource just become a > Source<*Row*, > SourceSplitT, EnumChkT>? > > Can you help me understand a bit more about the reason we need the > following relational representation / wrapper interfaces v.s. the > interfaces that we could put to the Source in FLIP-27? > > DynamicTableSource v.s. Source<Row, SourceSplitT, EnumChkT> > SupportsFilterablePushDown v.s. FilterableSource > SupportsProjectablePushDown v.s. ProjectableSource > SupportsWatermarkPushDown v.s. WithWatermarkAssigner > SupportsComputedColumnPushDown v.s. ComputedColumnDeserializer > ScanTableSource v.s. ChangeLogDeserializer. > LookUpTableSource v.s. LookUpSource > > Assuming we have all the interfaces on the right side, do we still > > need > > the > > interfaces on the left side? Note that the interfaces on the right > > can > > be > > used by both DataStream and Table. If we do this, there will only > > be > > one > > set of Source interfaces Table and DataStream, the only difference > > is > > that > > the Source for table will have some specific plugins and > > configurations. > > An > > omnipotent Source can implement all the the above interfaces and > > take a > > Deserializer that implements both ComputedColumnDeserializer and > ChangeLogDeserializer. > > Would the SQL planner work with that? > > Thanks, > > Jiangjie (Becket) Qin > > > > > > > On Tue, Mar 24, 2020 at 5:03 PM Jingsong Li < > > jingsongl...@gmail.com> > > < > > jingsongl...@gmail.com> > > wrote: > > > +1. Thanks Timo for the design doc. > > We can also consider @Experimental too. But I am +1 to > > @PublicEvolving, > > we > > should be confident in the current change. > > Best, > Jingsong Lee > > On Tue, Mar 24, 2020 at 4:30 PM Timo Walther <twal...@apache.org> > <twal...@apache.org> > > < > > twal...@apache.org> > > wrote: > > @Becket: We totally agree that we don't need table specific > > connectors > > during runtime. As Dawid said, the interfaces proposed here are > > just > > for > > communication with the planner. Once the properties (watermarks, > computed column, filters, projecttion etc.) are negotiated, we can > configure a regular Flink connector. > > E.g. setting the watermark assigner and deserialization schema of > > a > > Kafka connector. > > For better separation of concerns, Flink connectors should not > > include > > relational interfaces and depend on flink-table. This is the > responsibility of table source/sink. > > @Kurt: I would like to mark them @PublicEvolving already because > > we > > need > > to deprecate the old interfaces as early as possible. We cannot > > redirect > > to @Internal interfaces. They are not marked @Public, so we can > > still > > evolve them. But a core design shift should not happen again, it > > would > > leave a bad impression if we are redesign over and over again. > > Instead > > we should be confident in the current change. > > Regards, > Timo > > > On 24.03.20 09:20, Dawid Wysakowicz wrote: > > Hi Becket, > > Answering your question, we have the same intention not to > > duplicate > > connectors between datastream and table apis. The interfaces > > proposed > > in > > the FLIP are a way to describe relational properties of a source. > > The > > intention is as you described to translate all of those expressed > > as > > expressions or other Table specific structures into a DataStream > > source. > > In other words I think what we are doing here is in line with > > what > > you > > described. > > Best, > > Dawid > > On 24/03/2020 02:23, Becket Qin wrote: > > Hi Timo, > > Thanks for the proposal. I completely agree that the current > > Table > > connectors could be simplified quite a bit. I haven't finished > > reading > > everything, but here are some quick thoughts. > > Actually to me the biggest question is why should there be two > > different > > connector systems for DataStream and Table? What is the > > fundamental > > reason > > that is preventing us from merging them to one? > > The basic functionality of a connector is to provide > > capabilities > > to > > do > > IO > > and Serde. Conceptually, Table connectors should just be > > DataStream > > connectors that are dealing with Rows. It seems that quite a few > > of > > the > > special connector requirements are just a specific way to do IO > > / > > Serde. > > Taking SupportsFilterPushDown as an example, imagine we have the > > following > > interface: > > interface FilterableSource<PREDICATE> { > void applyFilterable(Supplier<PREDICATE> predicate); > } > > And if a ParquetSource would like to support filterable, it will > > become: > > class ParquetSource implements Source, > > FilterableSource(FilterPredicate> { > > ... > } > > For Table, one just need to provide an predicate supplier that > > converts > > an > > Expression to the specified predicate type. This has a few > > benefit: > > 1. Same unified API for filterable for sources, regardless of > > DataStream or > > Table. > 2. The DataStream users now can also use the > > ExpressionToPredicate > > supplier if they want to. > > To summarize, my main point is that I am wondering if it is > > possible > > to > > have a single set of connector interface for both Table and > > DataStream, > > rather than having two hierarchies. I am not 100% sure if this > > would > > work, > > but if it works, this would be a huge win from both code > > maintenance > > and > > user experience perspective. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Tue, Mar 24, 2020 at 2:03 AM Dawid Wysakowicz < > dwysakow...@apache.org> > > wrote: > > > Hi Timo, > > Thank you for the proposal. I think it is an important > > improvement > > that > > will benefit many parts of the Table API. The proposal looks > > really > > good > > to me and personally I would be comfortable with voting on the > > current > > state. > > Best, > > Dawid > > On 23/03/2020 18:53, Timo Walther wrote: > > Hi everyone, > > I received some questions around how the new interfaces play > > together > > with formats and their factories. > > Furthermore, for MySQL or Postgres CDC logs, the format should > > be > > able > > to return a `ChangelogMode`. > > Also, I incorporated the feedback around the factory design in > > general. > > I added a new section `Factory Interfaces` to the design > > document. > > This should be helpful to understand the big picture and > > connecting > > the concepts. > > Please let me know what you think? > > Thanks, > Timo > > > On 18.03.20 13:43, Timo Walther wrote: > > Hi Benchao, > > this is a very good question. I will update the FLIP about > > this. > > The legacy planner will not support the new interfaces. It > > will > > only > > support the old interfaces. With the next release, I think > > the > > Blink > > planner is stable enough to be the default one as well. > > Regards, > Timo > > On 18.03.20 08:45, Benchao Li wrote: > > Hi Timo, > > Thank you and others for the efforts to prepare this FLIP. > > The FLIP LGTM generally. > > +1 for moving blink data structures to table-common, it's > > useful > > to > > udf too > in the future. > A little question is, do we plan to support the new > > interfaces > > and > > data > > types in legacy planner? > Or we only plan to support these new interfaces in blink > > planner. > > And using primary keys from DDL instead of derived key > > information > > from > > each query is also a good idea, > we met some use cases where this does not works very well > > before. > > This FLIP also makes the dependencies of table modules more > > clear, I > > like > it very much. > > Timo Walther <twal...@apache.org> <twal...@apache.org> <twal...@apache.org> > <twal...@apache.org> > > 于2020年3月17日周二 > > 上午1:36写道: > > Hi everyone, > > I'm happy to present the results of long discussions that > > we > > had > > internally. Jark, Dawid, Aljoscha, Kurt, Jingsong, me, and > > many > > more > > have contributed to this design document. > > We would like to propose new long-term table source and > > table > > sink > > interfaces: > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces > > This is a requirement for FLIP-105 and finalizing FLIP-32. > > The goals of this FLIP are: > > - Simplify the current interface architecture: > - Merge upsert, retract, and append sinks. > - Unify batch and streaming sources. > - Unify batch and streaming sinks. > > - Allow sources to produce a changelog: > - UpsertTableSources have been requested a lot by > > users. > > Now > > is the > time to open the internal planner capabilities via the new > > interfaces. > > - According to FLIP-105, we would like to support > > changelogs for > > processing formats such as Debezium. > > - Don't rely on DataStream API for source and sinks: > - According to FLIP-32, the Table API and SQL should > > be > > independent > of the DataStream API which is why the `table-common` > > module > > has > > no > > dependencies on `flink-streaming-java`. > - Source and sink implementations should only depend > > on > > the > > `table-common` module after FLIP-27. > - Until FLIP-27 is ready, we still put most of the > > interfaces in > > `table-common` and strictly separate interfaces that > > communicate > > with a > planner and actual runtime reader/writers. > > - Implement efficient sources and sinks without planner > > dependencies: > > - Make Blink's internal data structures available to > > connectors. > > - Introduce stable interfaces for data structures > > that > > can > > be > > marked as `@PublicEvolving`. > - Only require dependencies on `flink-table-common` > > in > > the > > future > > It finalizes the concept of dynamic tables and consideres > > how > > all > > source/sink related classes play together. > > We look forward to your feedback. > > Regards, > Timo > > > -- > Best, Jingsong Lee > > > > >