Hi all, I really like the ideas of this FLIP. I think it improves user experience quite a bit. I wanted to add just two comments:
1. As for the StatementSet I like the approach described in the FLIP for its simplicity. Moreover the way I see it is that if a user wants to work with DataStream, then he/she wants to end up in the DataStream API, or in other words call the StreamExecutionEnvironment#execute. 2. @Timo What is the interaction between Row setters from the different modes? What happens if the user calls both in different order. E.g. row.setField(0, "ABC"); row.setField("f0", "ABC"); // is this a valid call ? or row.setField("f0", "ABC"); row.setField(0, "ABC"); // is this a valid call ? or row.setFieldNames(...); row.setField(0, "ABC"); // is this a valid call ? Best, Dawid On 01/09/2020 11:49, Timo Walther wrote: > Hi Jark, > > thanks for the detailed review. Let me answer your concerns: > > ## Conversion of DataStream to Table > > 1. "We limit the usage of `system_rowtime()/system_proctime` to the > leaf of a QueryOperation tree in the validation phase." > I'm fine with allowing `system_proctime` everywhere in the query. Also > for SQL, I think we should have done that earlier already to give > users the chance to have time based operations also at later stages. > > 2. "By using `system_rowtime().as("rowtime")` the watermark would be > assigned implicitly. " > Yes, we just use the DataStream API watermark. `system_rowtime()` will > just introduce a time attribute, the watermark travels to the Table > API and into DataStream API without further code changes. > > ## Conversion of Table to DataStream > > 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): > DataStream<Row>" > 4. "Table.execute(ChangelogMode)" > Filtering UPDATE_BEFORE is already quite important as it reduces the > amount of data by factor 2. But I also understand your concerns > regarding confusing users. I also got the request for a > `Table.getChangelogMode()` a couple of times in the past, because > users would like to get information about the kind of query that is > executed. However, in this case `toChangelogStream(Table)` is > equivalent to call ``toChangelogStream(Table.getChangelogMode(), > Table)` so we don't need `Table.getChangelogMode()` in the current > FLIP design. But this can be future work. Let's start with > `toChangelogStream(Table)` and wait for more feedback about this new > feature. What do others think? > > ## Conversion of StatementSet to DataStream API > > 5. "StreamStatementSet#attachToStream()" > > I think Godfrey's proposal is too complex for regular users. Instead > of continuing with the fluent programming, we would force users to > define a DataStream pipeline in a lambda. > > Furthermore, joining or using connect() with a different DataStream > source would not be possible in this design. > > The `execute()` method of `StatementSet` should not execute the > DataStream API subprogram. It mixes the concepts because we tell > users: "If you use toDataStream" you need to use > `StreamExecutionEnvironment.execute()`. > > We don't solve every potential use case with the current FLIP design > but the most important one where a pipeline just uses an INSERT INTO > but also uses Table API for connectors and preprocessing and does the > main logic in DataStream API: > > T1 -> T2, T3 -> DataStream, T4 -> DataStream > > I would consider `StatementSet.addDataStream(Table, ...)` future work > for now as it is only an opimization for reusing parts of the > StreamGraph. We could even perform this optimization when calling > `toInsertStream` or `toChangelogStream`. > > ## Improve dealing with Row in DataStream API > > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" > > We need a Map for constant time of mapping field name to index. > > We accept a nullable `fieldNames` because names are not mandatory, one > can also work with indices as before. > > But you are right that the fieldNames member variable can be > immutable. I just wanted to avoid too many overloaded constructors. > I'm fine with having one full constructor for RowKind, arity and field > names (or null). > > 7. "a Row has two modes represented by an internal boolean flag > `hasFieldOrder`." > Maybe I leaked to many implementation details there that rather > confuse readers than help. Internally, we need to distinguish between > two kinds of rows. A user should not be bothered by this. > > a) Row comes from Table API runtime: hasFieldOrder = true > Map("myAge" -> 0, "myName" -> 1) > > row.getField("myName") == row.getField(1) > row.getField("myAge") == row.getField(0) > > b) Row comes from user: hasFieldOrder = false > Row row = new Row(2); > row.setField("myName", "Alice"); > row.setField("myAge", 32); > > Map("myAge" -> 1, "myName" -> 0) > > But the type information will decide about the order of the fields > later and reorder them accordingly during serialization or RowData > conversion: > > ["myName", "myAge"] vs. ["myAge", "myName"] > > The user must not care about this as it always feels naturally to deal > with the rows. > > Regards, > Timo > > > On 01.09.20 06:19, Jark Wu wrote: >> Hi Timo, >> >> Thanks a lot for the great proposal and sorry for the late reply. >> This is >> an important improvement for DataStream and Table API users. >> >> I have listed my thoughts and questions below ;-) >> >> ## Conversion of DataStream to Table >> >> 1. "We limit the usage of `system_rowtime()/system_proctime` to the >> leaf of >> a QueryOperation tree in the validation phase." >> IIUC, that means `system_rowtime()` can only be used in the first >> `select()` after `fromXxxStream()`, right? >> However, I think `system_proctime()` shouldn't have this limitation, >> because it doesn't rely on the underlying timestamp of StreamRecord and >> can be generated in any stage of the query. >> >> 2. "By using `system_rowtime().as("rowtime")` the watermark would be >> assigned implicitly. " >> What watermark will be used here? Is the pre-assigned watermark in the >> DataStream (so called `system_watermak()`)? >> >> ## Conversion of Table to DataStream >> >> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table): >> DataStream<Row>" >> I'm not sure whether this method is useful for users. Currently, the >> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for >> filtering UPDATE_BEFORE if possible. >> However, if we expose this method to users, it may be confusing. >> Users may >> try to use this method to convert a changelog stream to an insert-only >> stream by applying ChangelogMode.insertOnly(). This might be misleading. >> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They >> have >> to know the ChangelogMode of the current Table first, and remove >> UPDATE_BEFORE from the ChagnelogMode. >> That means we have to support `Table.getChangelogMode()` first? But >> `ChangelogMode` derivation requires a full optimization path on the >> Table, >> which seems impossible now. >> Therefore, IMHO, we can introduce this interface in the future if users >> indeed need this. For most users, I think `toChangelogStream(Table)` is >> enough. >> >> 4. "Table.execute(ChangelogMode)" >> Ditto. >> >> ## Conversion of StatementSet to DataStream API >> >> 5. "StreamStatementSet#attachToStream()" >> I think the potential drawback is that it can't support multi-sink >> optimization, i.e. share pipeline. >> For example, if we have a Table `t1` (a heavy view uses join, >> aggregate), >> and want to sink to "mysql" using SQL and want to continue processing >> using >> DataStream in a job. >> It's a huge waste of resources if we re-compute `t1`. It would be >> nice if >> we can come up with a solution to share the pipeline. >> >> I borrowed Godfrey's idea in FLINK-18840 and added some >> modifications. What >> do you think about the following proposal? >> >> interface StatementSet { >> StatementSet addDataStream(Table table, TableDataStreamTransform >> transform); >> } >> >> interface TableDataStreamTransform { >> void transform(Context); >> >> interface Context { >> Table getTable(); >> DataStream<Row> toInsertStream(Table); >> DataStream<T> toInsertStream(AbstractDataType<?>, Table); >> DataStream<Row> toChangelogStream(Table); >> } >> } >> >> tEnv >> .createStatementSet() >> .addInsert("mysql", table1) >> .addDataStream(table1, ctx -> { >> ctx.toInsertStream(ctx.getTable()) >> .flatmap(..) >> .keyBy(..) >> .process(..) >> .addSink(...); >> }) >> >> >> ## Improve dealing with Row in DataStream API >> >> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)" >> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is >> enough and more handy than Map ? >> - Currently, the fieldNames member variable is mutable, is it on >> purpose? >> Can we make it immutable? For example, only accept from the constructor. >> - Why do we accept a nullable `fieldNames`? >> >> 7. "a Row has two modes represented by an internal boolean flag >> `hasFieldOrder`." >> Sorry, I don't fully understand what does the `hasFieldOrder` mean >> and is >> used for. Could you explain a bit more for this? >> >> Best, >> Jark >> >> >> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote: >> >>> Hi David, >>> >>> thanks for your feedback. Feedback from someone who interacts with many >>> users is very valuable. I added an explanation for StatementSets to the >>> FLIP. >>> >>> Regarding watermarks and fromInsertStream, actually the >>> >>> `Schema.watermark("ts", system_watermark())` >>> >>> is not really necessary in the `fromChangelogStream`. It is added to >>> satify the Schema interface and be similar to SQL DDL. >>> >>> We could already extract the watermark strategy if we see >>> `system_rowtime()` because in most of the cases we will simply use the >>> DataStream API watermarks. >>> >>> But maybe some users want to generate watermarks after preprocessing in >>> DataStream API. In this cases users what to define a computed watermark >>> expression. >>> >>> So for simplicity in the Simple API we introduce: >>> >>> tEnv >>> .fromInsertStream(DataStream<T>) >>> .select('*, system_rowtime().as("rowtime"), >>> system_proctime().as("proctime")) >>> >>> and just rely on the watermarks that travel through DataStream API >>> already. I added another comment to the FLIP. >>> >>> Regards, >>> Timo >>> >>> >>> On 19.08.20 10:53, David Anderson wrote: >>>> Timo, nice to see this. >>>> >>>> As someone who expects to use these interfaces, but who doesn't fully >>>> understand the existing Table API, I like what I see. Just a couple of >>>> comments: >>>> >>>> The way that watermarks fit into the fromChangelogStream case makes >>>> sense >>>> to me, and I'm wondering why watermarks don't come up in the previous >>>> section about fromInsertStream. >>>> >>>> I wasn't familiar with StatementSets, and I couldn't find an >>>> explanation >>> in >>>> the docs. I eventually found this short paragraph in an email from >>>> Fabian >>>> Hueske, which clarified everything in that section for me: >>>> >>>> FLIP-84 [1] added the concept of a "statement set" to group >>>> multiple >>>> INSERT >>>> INTO statements (SQL or Table API) together. The statements in a >>>> statement >>>> set are jointly optimized and executed as a single Flink job. >>>> >>>> Maybe if you add this to the FLIP it will help other readers as well. >>>> >>>> Best, >>>> David >>>> >>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org> >>> wrote: >>>> >>>>> Hi everyone, >>>>> >>>>> I would like to propose a FLIP that aims to resolve the remaining >>>>> shortcomings in the Table API: >>>>> >>>>> >>>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API >>> >>>>> >>>>> The Table API has received many new features over the last year. It >>>>> supports a new type system (FLIP-37), connectors support changelogs >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95), >>>>> support for result retrieval in an interactive fashion (FLIP-84), and >>>>> soon new TableDescriptors (FLIP-129). >>>>> >>>>> However, the interfaces from and to DataStream API have not been >>>>> touched >>>>> during the introduction of these new features and are kind of >>>>> outdated. >>>>> The interfaces lack important functionality that is available in >>>>> Table >>>>> API but not exposed to DataStream API users. DataStream API is >>>>> still our >>>>> most important API which is why a good interoperability is crucial. >>>>> >>>>> This FLIP is a mixture of different topics that improve the >>>>> interoperability between DataStream and Table API in terms of: >>>>> >>>>> - DataStream <-> Table conversion >>>>> - translation of type systems TypeInformation <-> DataType >>>>> - schema definition (incl. rowtime, watermarks, primary key) >>>>> - changelog handling >>>>> - row handling in DataStream API >>>>> >>>>> I'm looking forward to your feedback. >>>>> >>>>> Regards, >>>>> Timo >>>>> >>>> >>> >>> >> >
signature.asc
Description: OpenPGP digital signature