Hi Timo, Thanks a lot for the great proposal and sorry for the late reply. This is an important improvement for DataStream and Table API users.
I have listed my thoughts and questions below ;-) ## Conversion of DataStream to Table 1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of a QueryOperation tree in the validation phase." IIUC, that means `system_rowtime()` can only be used in the first `select()` after `fromXxxStream()`, right? However, I think `system_proctime()` shouldn't have this limitation, because it doesn't rely on the underlying timestamp of StreamRecord and can be generated in any stage of the query. 2. "By using `system_rowtime().as("rowtime")` the watermark would be assigned implicitly. " What watermark will be used here? Is the pre-assigned watermark in the DataStream (so called `system_watermak()`)? ## Conversion of Table to DataStream 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): DataStream<Row>" I'm not sure whether this method is useful for users. Currently, the `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for filtering UPDATE_BEFORE if possible. However, if we expose this method to users, it may be confusing. Users may try to use this method to convert a changelog stream to an insert-only stream by applying ChangelogMode.insertOnly(). This might be misleading. What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have to know the ChangelogMode of the current Table first, and remove UPDATE_BEFORE from the ChagnelogMode. That means we have to support `Table.getChangelogMode()` first? But `ChangelogMode` derivation requires a full optimization path on the Table, which seems impossible now. Therefore, IMHO, we can introduce this interface in the future if users indeed need this. For most users, I think `toChangelogStream(Table)` is enough. 4. "Table.execute(ChangelogMode)" Ditto. ## Conversion of StatementSet to DataStream API 5. "StreamStatementSet#attachToStream()" I think the potential drawback is that it can't support multi-sink optimization, i.e. share pipeline. For example, if we have a Table `t1` (a heavy view uses join, aggregate), and want to sink to "mysql" using SQL and want to continue processing using DataStream in a job. It's a huge waste of resources if we re-compute `t1`. It would be nice if we can come up with a solution to share the pipeline. I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What do you think about the following proposal? interface StatementSet { StatementSet addDataStream(Table table, TableDataStreamTransform transform); } interface TableDataStreamTransform { void transform(Context); interface Context { Table getTable(); DataStream<Row> toInsertStream(Table); DataStream<T> toInsertStream(AbstractDataType<?>, Table); DataStream<Row> toChangelogStream(Table); } } tEnv .createStatementSet() .addInsert("mysql", table1) .addDataStream(table1, ctx -> { ctx.toInsertStream(ctx.getTable()) .flatmap(..) .keyBy(..) .process(..) .addSink(...); }) ## Improve dealing with Row in DataStream API 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is enough and more handy than Map ? - Currently, the fieldNames member variable is mutable, is it on purpose? Can we make it immutable? For example, only accept from the constructor. - Why do we accept a nullable `fieldNames`? 7. "a Row has two modes represented by an internal boolean flag `hasFieldOrder`." Sorry, I don't fully understand what does the `hasFieldOrder` mean and is used for. Could you explain a bit more for this? Best, Jark On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote: > Hi David, > > thanks for your feedback. Feedback from someone who interacts with many > users is very valuable. I added an explanation for StatementSets to the > FLIP. > > Regarding watermarks and fromInsertStream, actually the > > `Schema.watermark("ts", system_watermark())` > > is not really necessary in the `fromChangelogStream`. It is added to > satify the Schema interface and be similar to SQL DDL. > > We could already extract the watermark strategy if we see > `system_rowtime()` because in most of the cases we will simply use the > DataStream API watermarks. > > But maybe some users want to generate watermarks after preprocessing in > DataStream API. In this cases users what to define a computed watermark > expression. > > So for simplicity in the Simple API we introduce: > > tEnv > .fromInsertStream(DataStream<T>) > .select('*, system_rowtime().as("rowtime"), > system_proctime().as("proctime")) > > and just rely on the watermarks that travel through DataStream API > already. I added another comment to the FLIP. > > Regards, > Timo > > > On 19.08.20 10:53, David Anderson wrote: > > Timo, nice to see this. > > > > As someone who expects to use these interfaces, but who doesn't fully > > understand the existing Table API, I like what I see. Just a couple of > > comments: > > > > The way that watermarks fit into the fromChangelogStream case makes sense > > to me, and I'm wondering why watermarks don't come up in the previous > > section about fromInsertStream. > > > > I wasn't familiar with StatementSets, and I couldn't find an explanation > in > > the docs. I eventually found this short paragraph in an email from Fabian > > Hueske, which clarified everything in that section for me: > > > > FLIP-84 [1] added the concept of a "statement set" to group multiple > > INSERT > > INTO statements (SQL or Table API) together. The statements in a > > statement > > set are jointly optimized and executed as a single Flink job. > > > > Maybe if you add this to the FLIP it will help other readers as well. > > > > Best, > > David > > > > On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org> > wrote: > > > >> Hi everyone, > >> > >> I would like to propose a FLIP that aims to resolve the remaining > >> shortcomings in the Table API: > >> > >> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > >> > >> The Table API has received many new features over the last year. It > >> supports a new type system (FLIP-37), connectors support changelogs > >> (FLIP-95), we have well defined internal data structures (FLIP-95), > >> support for result retrieval in an interactive fashion (FLIP-84), and > >> soon new TableDescriptors (FLIP-129). > >> > >> However, the interfaces from and to DataStream API have not been touched > >> during the introduction of these new features and are kind of outdated. > >> The interfaces lack important functionality that is available in Table > >> API but not exposed to DataStream API users. DataStream API is still our > >> most important API which is why a good interoperability is crucial. > >> > >> This FLIP is a mixture of different topics that improve the > >> interoperability between DataStream and Table API in terms of: > >> > >> - DataStream <-> Table conversion > >> - translation of type systems TypeInformation <-> DataType > >> - schema definition (incl. rowtime, watermarks, primary key) > >> - changelog handling > >> - row handling in DataStream API > >> > >> I'm looking forward to your feedback. > >> > >> Regards, > >> Timo > >> > > > >