Hi Timo, Thanks for the quick response.
5. "StreamStatementSet#attachToStream()" Joining or using connect() with a different DataStream is a good case. cc @Godfrey , what do you think about the `attachToStream()` API? 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > We need a Map for constant time of mapping field name to index. But we can easily build the Map from the List<String> fieldNames in Row constructor. IMO, manually building the Map and mapping names to indices is verbose and error-prone. Are you concerned about the per-record performance? 7. "a Row has two modes represented by an internal boolean flag `hasFieldOrder`." Thanks for the explanation. Regarding the case (b), I have the same confusion with Dawid that what's the result when index-based setters and name-based setters are mixed used (esp. in foreach and if branches). TBH, I don't see a strong need for named setters. Using it as the UDAF accumulator is not as good as POJO in terms of performance and ease of use. Best, Jark On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hi all, > > I really like the ideas of this FLIP. I think it improves user > experience quite a bit. I wanted to add just two comments: > > 1. As for the StatementSet I like the approach described in the FLIP for > its simplicity. Moreover the way I see it is that if a user wants to > work with DataStream, then he/she wants to end up in the DataStream API, > or in other words call the StreamExecutionEnvironment#execute. > > 2. @Timo What is the interaction between Row setters from the different > modes? What happens if the user calls both in different order. E.g. > > row.setField(0, "ABC"); > > row.setField("f0", "ABC"); // is this a valid call ? > > or > > row.setField("f0", "ABC"); > > row.setField(0, "ABC"); // is this a valid call ? > > or > > row.setFieldNames(...); > > row.setField(0, "ABC"); // is this a valid call ? > > Best, > > Dawid > > On 01/09/2020 11:49, Timo Walther wrote: > > Hi Jark, > > > > thanks for the detailed review. Let me answer your concerns: > > > > ## Conversion of DataStream to Table > > > > 1. "We limit the usage of `system_rowtime()/system_proctime` to the > > leaf of a QueryOperation tree in the validation phase." > > I'm fine with allowing `system_proctime` everywhere in the query. Also > > for SQL, I think we should have done that earlier already to give > > users the chance to have time based operations also at later stages. > > > > 2. "By using `system_rowtime().as("rowtime")` the watermark would be > > assigned implicitly. " > > Yes, we just use the DataStream API watermark. `system_rowtime()` will > > just introduce a time attribute, the watermark travels to the Table > > API and into DataStream API without further code changes. > > > > ## Conversion of Table to DataStream > > > > 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): > > DataStream<Row>" > > 4. "Table.execute(ChangelogMode)" > > Filtering UPDATE_BEFORE is already quite important as it reduces the > > amount of data by factor 2. But I also understand your concerns > > regarding confusing users. I also got the request for a > > `Table.getChangelogMode()` a couple of times in the past, because > > users would like to get information about the kind of query that is > > executed. However, in this case `toChangelogStream(Table)` is > > equivalent to call ``toChangelogStream(Table.getChangelogMode(), > > Table)` so we don't need `Table.getChangelogMode()` in the current > > FLIP design. But this can be future work. Let's start with > > `toChangelogStream(Table)` and wait for more feedback about this new > > feature. What do others think? > > > > ## Conversion of StatementSet to DataStream API > > > > 5. "StreamStatementSet#attachToStream()" > > > > I think Godfrey's proposal is too complex for regular users. Instead > > of continuing with the fluent programming, we would force users to > > define a DataStream pipeline in a lambda. > > > > Furthermore, joining or using connect() with a different DataStream > > source would not be possible in this design. > > > > The `execute()` method of `StatementSet` should not execute the > > DataStream API subprogram. It mixes the concepts because we tell > > users: "If you use toDataStream" you need to use > > `StreamExecutionEnvironment.execute()`. > > > > We don't solve every potential use case with the current FLIP design > > but the most important one where a pipeline just uses an INSERT INTO > > but also uses Table API for connectors and preprocessing and does the > > main logic in DataStream API: > > > > T1 -> T2, T3 -> DataStream, T4 -> DataStream > > > > I would consider `StatementSet.addDataStream(Table, ...)` future work > > for now as it is only an opimization for reusing parts of the > > StreamGraph. We could even perform this optimization when calling > > `toInsertStream` or `toChangelogStream`. > > > > ## Improve dealing with Row in DataStream API > > > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > > > We need a Map for constant time of mapping field name to index. > > > > We accept a nullable `fieldNames` because names are not mandatory, one > > can also work with indices as before. > > > > But you are right that the fieldNames member variable can be > > immutable. I just wanted to avoid too many overloaded constructors. > > I'm fine with having one full constructor for RowKind, arity and field > > names (or null). > > > > 7. "a Row has two modes represented by an internal boolean flag > > `hasFieldOrder`." > > Maybe I leaked to many implementation details there that rather > > confuse readers than help. Internally, we need to distinguish between > > two kinds of rows. A user should not be bothered by this. > > > > a) Row comes from Table API runtime: hasFieldOrder = true > > Map("myAge" -> 0, "myName" -> 1) > > > > row.getField("myName") == row.getField(1) > > row.getField("myAge") == row.getField(0) > > > > b) Row comes from user: hasFieldOrder = false > > Row row = new Row(2); > > row.setField("myName", "Alice"); > > row.setField("myAge", 32); > > > > Map("myAge" -> 1, "myName" -> 0) > > > > But the type information will decide about the order of the fields > > later and reorder them accordingly during serialization or RowData > > conversion: > > > > ["myName", "myAge"] vs. ["myAge", "myName"] > > > > The user must not care about this as it always feels naturally to deal > > with the rows. > > > > Regards, > > Timo > > > > > > On 01.09.20 06:19, Jark Wu wrote: > >> Hi Timo, > >> > >> Thanks a lot for the great proposal and sorry for the late reply. > >> This is > >> an important improvement for DataStream and Table API users. > >> > >> I have listed my thoughts and questions below ;-) > >> > >> ## Conversion of DataStream to Table > >> > >> 1. "We limit the usage of `system_rowtime()/system_proctime` to the > >> leaf of > >> a QueryOperation tree in the validation phase." > >> IIUC, that means `system_rowtime()` can only be used in the first > >> `select()` after `fromXxxStream()`, right? > >> However, I think `system_proctime()` shouldn't have this limitation, > >> because it doesn't rely on the underlying timestamp of StreamRecord and > >> can be generated in any stage of the query. > >> > >> 2. "By using `system_rowtime().as("rowtime")` the watermark would be > >> assigned implicitly. " > >> What watermark will be used here? Is the pre-assigned watermark in the > >> DataStream (so called `system_watermak()`)? > >> > >> ## Conversion of Table to DataStream > >> > >> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): > >> DataStream<Row>" > >> I'm not sure whether this method is useful for users. Currently, the > >> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for > >> filtering UPDATE_BEFORE if possible. > >> However, if we expose this method to users, it may be confusing. > >> Users may > >> try to use this method to convert a changelog stream to an insert-only > >> stream by applying ChangelogMode.insertOnly(). This might be misleading. > >> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They > >> have > >> to know the ChangelogMode of the current Table first, and remove > >> UPDATE_BEFORE from the ChagnelogMode. > >> That means we have to support `Table.getChangelogMode()` first? But > >> `ChangelogMode` derivation requires a full optimization path on the > >> Table, > >> which seems impossible now. > >> Therefore, IMHO, we can introduce this interface in the future if users > >> indeed need this. For most users, I think `toChangelogStream(Table)` is > >> enough. > >> > >> 4. "Table.execute(ChangelogMode)" > >> Ditto. > >> > >> ## Conversion of StatementSet to DataStream API > >> > >> 5. "StreamStatementSet#attachToStream()" > >> I think the potential drawback is that it can't support multi-sink > >> optimization, i.e. share pipeline. > >> For example, if we have a Table `t1` (a heavy view uses join, > >> aggregate), > >> and want to sink to "mysql" using SQL and want to continue processing > >> using > >> DataStream in a job. > >> It's a huge waste of resources if we re-compute `t1`. It would be > >> nice if > >> we can come up with a solution to share the pipeline. > >> > >> I borrowed Godfrey's idea in FLINK-18840 and added some > >> modifications. What > >> do you think about the following proposal? > >> > >> interface StatementSet { > >> StatementSet addDataStream(Table table, TableDataStreamTransform > >> transform); > >> } > >> > >> interface TableDataStreamTransform { > >> void transform(Context); > >> > >> interface Context { > >> Table getTable(); > >> DataStream<Row> toInsertStream(Table); > >> DataStream<T> toInsertStream(AbstractDataType<?>, Table); > >> DataStream<Row> toChangelogStream(Table); > >> } > >> } > >> > >> tEnv > >> .createStatementSet() > >> .addInsert("mysql", table1) > >> .addDataStream(table1, ctx -> { > >> ctx.toInsertStream(ctx.getTable()) > >> .flatmap(..) > >> .keyBy(..) > >> .process(..) > >> .addSink(...); > >> }) > >> > >> > >> ## Improve dealing with Row in DataStream API > >> > >> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > >> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is > >> enough and more handy than Map ? > >> - Currently, the fieldNames member variable is mutable, is it on > >> purpose? > >> Can we make it immutable? For example, only accept from the constructor. > >> - Why do we accept a nullable `fieldNames`? > >> > >> 7. "a Row has two modes represented by an internal boolean flag > >> `hasFieldOrder`." > >> Sorry, I don't fully understand what does the `hasFieldOrder` mean > >> and is > >> used for. Could you explain a bit more for this? > >> > >> Best, > >> Jark > >> > >> > >> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote: > >> > >>> Hi David, > >>> > >>> thanks for your feedback. Feedback from someone who interacts with many > >>> users is very valuable. I added an explanation for StatementSets to the > >>> FLIP. > >>> > >>> Regarding watermarks and fromInsertStream, actually the > >>> > >>> `Schema.watermark("ts", system_watermark())` > >>> > >>> is not really necessary in the `fromChangelogStream`. It is added to > >>> satify the Schema interface and be similar to SQL DDL. > >>> > >>> We could already extract the watermark strategy if we see > >>> `system_rowtime()` because in most of the cases we will simply use the > >>> DataStream API watermarks. > >>> > >>> But maybe some users want to generate watermarks after preprocessing in > >>> DataStream API. In this cases users what to define a computed watermark > >>> expression. > >>> > >>> So for simplicity in the Simple API we introduce: > >>> > >>> tEnv > >>> .fromInsertStream(DataStream<T>) > >>> .select('*, system_rowtime().as("rowtime"), > >>> system_proctime().as("proctime")) > >>> > >>> and just rely on the watermarks that travel through DataStream API > >>> already. I added another comment to the FLIP. > >>> > >>> Regards, > >>> Timo > >>> > >>> > >>> On 19.08.20 10:53, David Anderson wrote: > >>>> Timo, nice to see this. > >>>> > >>>> As someone who expects to use these interfaces, but who doesn't fully > >>>> understand the existing Table API, I like what I see. Just a couple of > >>>> comments: > >>>> > >>>> The way that watermarks fit into the fromChangelogStream case makes > >>>> sense > >>>> to me, and I'm wondering why watermarks don't come up in the previous > >>>> section about fromInsertStream. > >>>> > >>>> I wasn't familiar with StatementSets, and I couldn't find an > >>>> explanation > >>> in > >>>> the docs. I eventually found this short paragraph in an email from > >>>> Fabian > >>>> Hueske, which clarified everything in that section for me: > >>>> > >>>> FLIP-84 [1] added the concept of a "statement set" to group > >>>> multiple > >>>> INSERT > >>>> INTO statements (SQL or Table API) together. The statements in a > >>>> statement > >>>> set are jointly optimized and executed as a single Flink job. > >>>> > >>>> Maybe if you add this to the FLIP it will help other readers as well. > >>>> > >>>> Best, > >>>> David > >>>> > >>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org> > >>> wrote: > >>>> > >>>>> Hi everyone, > >>>>> > >>>>> I would like to propose a FLIP that aims to resolve the remaining > >>>>> shortcomings in the Table API: > >>>>> > >>>>> > >>>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API > >>> > >>>>> > >>>>> The Table API has received many new features over the last year. It > >>>>> supports a new type system (FLIP-37), connectors support changelogs > >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95), > >>>>> support for result retrieval in an interactive fashion (FLIP-84), and > >>>>> soon new TableDescriptors (FLIP-129). > >>>>> > >>>>> However, the interfaces from and to DataStream API have not been > >>>>> touched > >>>>> during the introduction of these new features and are kind of > >>>>> outdated. > >>>>> The interfaces lack important functionality that is available in > >>>>> Table > >>>>> API but not exposed to DataStream API users. DataStream API is > >>>>> still our > >>>>> most important API which is why a good interoperability is crucial. > >>>>> > >>>>> This FLIP is a mixture of different topics that improve the > >>>>> interoperability between DataStream and Table API in terms of: > >>>>> > >>>>> - DataStream <-> Table conversion > >>>>> - translation of type systems TypeInformation <-> DataType > >>>>> - schema definition (incl. rowtime, watermarks, primary key) > >>>>> - changelog handling > >>>>> - row handling in DataStream API > >>>>> > >>>>> I'm looking forward to your feedback. > >>>>> > >>>>> Regards, > >>>>> Timo > >>>>> > >>>> > >>> > >>> > >> > > > >