Hi Timo,

Thanks for the quick response.

5. "StreamStatementSet#attachToStream()"
Joining or using connect() with a different DataStream is a good case.
cc @Godfrey , what do you think about the `attachToStream()` API?

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> We need a Map for constant time of mapping field name to index.
But we can easily build the Map from the List<String> fieldNames in Row
constructor.
IMO, manually building the Map and mapping names to indices is verbose and
error-prone.
Are you concerned about the per-record performance?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Thanks for the explanation.
Regarding the case (b), I have the same confusion with Dawid that what's
the result when index-based setters and name-based setters are mixed used
(esp. in foreach and if branches).
TBH, I don't see a strong need for named setters. Using it as the UDAF
accumulator is not as good as POJO in terms of performance and ease of use.

Best,
Jark

On Tue, 1 Sep 2020 at 20:28, Dawid Wysakowicz <dwysakow...@apache.org>
wrote:

> Hi all,
>
> I really like the ideas of this FLIP. I think it improves user
> experience quite a bit. I wanted to add just two comments:
>
> 1. As for the StatementSet I like the approach described in the FLIP for
> its simplicity. Moreover the way I see it is that if a user wants to
> work with DataStream, then he/she wants to end up in the DataStream API,
> or in other words call the StreamExecutionEnvironment#execute.
>
> 2. @Timo What is the interaction between Row setters from the different
> modes? What happens if the user calls both in different order. E.g.
>
> row.setField(0, "ABC");
>
> row.setField("f0", "ABC"); // is this a valid call ?
>
> or
>
> row.setField("f0", "ABC");
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> or
>
> row.setFieldNames(...);
>
> row.setField(0, "ABC"); // is this a valid call ?
>
> Best,
>
> Dawid
>
> On 01/09/2020 11:49, Timo Walther wrote:
> > Hi Jark,
> >
> > thanks for the detailed review. Let me answer your concerns:
> >
> > ## Conversion of DataStream to Table
> >
> > 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> > leaf of a QueryOperation tree in the validation phase."
> > I'm fine with allowing `system_proctime` everywhere in the query. Also
> > for SQL, I think we should have done that earlier already to give
> > users the chance to have time based operations also at later stages.
> >
> > 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> > assigned implicitly. "
> > Yes, we just use the DataStream API watermark. `system_rowtime()` will
> > just introduce a time attribute, the watermark travels to the Table
> > API and into DataStream API without further code changes.
> >
> > ## Conversion of Table to DataStream
> >
> > 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> > DataStream<Row>"
> > 4. "Table.execute(ChangelogMode)"
> > Filtering UPDATE_BEFORE is already quite important as it reduces the
> > amount of data by factor 2. But I also understand your concerns
> > regarding confusing users. I also got the request for a
> > `Table.getChangelogMode()` a couple of times in the past, because
> > users would like to get information about the kind of query that is
> > executed. However, in this case `toChangelogStream(Table)` is
> > equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> > Table)` so we don't need `Table.getChangelogMode()` in the current
> > FLIP design. But this can be future work. Let's start with
> > `toChangelogStream(Table)` and wait for more feedback about this new
> > feature. What do others think?
> >
> > ## Conversion of StatementSet to DataStream API
> >
> > 5. "StreamStatementSet#attachToStream()"
> >
> > I think Godfrey's proposal is too complex for regular users. Instead
> > of continuing with the fluent programming, we would force users to
> > define a DataStream pipeline in a lambda.
> >
> > Furthermore, joining or using connect() with a different DataStream
> > source would not be possible in this design.
> >
> > The `execute()` method of `StatementSet` should not execute the
> > DataStream API subprogram. It mixes the concepts because we tell
> > users: "If you use toDataStream" you need to use
> > `StreamExecutionEnvironment.execute()`.
> >
> > We don't solve every potential use case with the current FLIP design
> > but the most important one where a pipeline just uses an INSERT INTO
> > but also uses Table API for connectors and preprocessing and does the
> > main logic in DataStream API:
> >
> > T1 -> T2, T3 -> DataStream, T4 -> DataStream
> >
> > I would consider `StatementSet.addDataStream(Table, ...)` future work
> > for now as it is only an opimization for reusing parts of the
> > StreamGraph. We could even perform this optimization when calling
> > `toInsertStream` or `toChangelogStream`.
> >
> > ## Improve dealing with Row in DataStream API
> >
> > 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >
> > We need a Map for constant time of mapping field name to index.
> >
> > We accept a nullable `fieldNames` because names are not mandatory, one
> > can also work with indices as before.
> >
> > But you are right that the fieldNames member variable can be
> > immutable. I just wanted to avoid too many overloaded constructors.
> > I'm fine with having one full constructor for RowKind, arity and field
> > names (or null).
> >
> > 7. "a Row has two modes represented by an internal boolean flag
> > `hasFieldOrder`."
> > Maybe I leaked to many implementation details there that rather
> > confuse readers than help. Internally, we need to distinguish between
> > two kinds of rows. A user should not be bothered by this.
> >
> > a) Row comes from Table API runtime: hasFieldOrder = true
> > Map("myAge" -> 0, "myName" -> 1)
> >
> > row.getField("myName") == row.getField(1)
> > row.getField("myAge") == row.getField(0)
> >
> > b) Row comes from user: hasFieldOrder = false
> > Row row = new Row(2);
> > row.setField("myName", "Alice");
> > row.setField("myAge", 32);
> >
> > Map("myAge" -> 1, "myName" -> 0)
> >
> > But the type information will decide about the order of the fields
> > later and reorder them accordingly during serialization or RowData
> > conversion:
> >
> > ["myName", "myAge"] vs. ["myAge", "myName"]
> >
> > The user must not care about this as it always feels naturally to deal
> > with the rows.
> >
> > Regards,
> > Timo
> >
> >
> > On 01.09.20 06:19, Jark Wu wrote:
> >> Hi Timo,
> >>
> >> Thanks a lot for the great proposal and sorry for the late reply.
> >> This is
> >> an important improvement for DataStream and Table API users.
> >>
> >> I have listed my thoughts and questions below ;-)
> >>
> >> ## Conversion of DataStream to Table
> >>
> >> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> >> leaf of
> >> a QueryOperation tree in the validation phase."
> >> IIUC, that means `system_rowtime()` can only be used in the first
> >> `select()` after `fromXxxStream()`, right?
> >> However, I think `system_proctime()` shouldn't have this limitation,
> >> because it doesn't rely on the underlying timestamp of StreamRecord and
> >>   can be generated in any stage of the query.
> >>
> >> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> >> assigned implicitly. "
> >> What watermark will be used here? Is the pre-assigned watermark in the
> >> DataStream (so called `system_watermak()`)?
> >>
> >> ## Conversion of Table to DataStream
> >>
> >> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> >> DataStream<Row>"
> >> I'm not sure whether this method is useful for users. Currently, the
> >> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
> >> filtering UPDATE_BEFORE if possible.
> >> However, if we expose this method to users, it may be confusing.
> >> Users may
> >> try to use this method to convert a changelog stream to an insert-only
> >> stream by applying ChangelogMode.insertOnly(). This might be misleading.
> >> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
> >> have
> >> to know the ChangelogMode of the current Table first, and remove
> >> UPDATE_BEFORE from the ChagnelogMode.
> >> That means we have to support `Table.getChangelogMode()` first? But
> >> `ChangelogMode` derivation requires a full optimization path on the
> >> Table,
> >> which seems impossible now.
> >> Therefore, IMHO, we can introduce this interface in the future if users
> >> indeed need this. For most users, I think `toChangelogStream(Table)` is
> >> enough.
> >>
> >> 4. "Table.execute(ChangelogMode)"
> >> Ditto.
> >>
> >> ## Conversion of StatementSet to DataStream API
> >>
> >> 5. "StreamStatementSet#attachToStream()"
> >> I think the potential drawback is that it can't support multi-sink
> >> optimization, i.e. share pipeline.
> >> For example, if we have a Table `t1` (a heavy view uses join,
> >> aggregate),
> >> and want to sink to "mysql" using SQL and want to continue processing
> >> using
> >> DataStream in a job.
> >> It's a huge waste of resources if we re-compute `t1`. It would be
> >> nice if
> >> we can come up with a solution to share the pipeline.
> >>
> >> I borrowed Godfrey's idea in FLINK-18840 and added some
> >> modifications. What
> >> do you think about the following proposal?
> >>
> >> interface StatementSet {
> >>     StatementSet addDataStream(Table table, TableDataStreamTransform
> >> transform);
> >> }
> >>
> >> interface TableDataStreamTransform {
> >>     void transform(Context);
> >>
> >>     interface Context {
> >>         Table getTable();
> >>         DataStream<Row> toInsertStream(Table);
> >>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
> >>         DataStream<Row> toChangelogStream(Table);
> >>     }
> >> }
> >>
> >> tEnv
> >>    .createStatementSet()
> >>    .addInsert("mysql", table1)
> >>    .addDataStream(table1, ctx -> {
> >>        ctx.toInsertStream(ctx.getTable())
> >>          .flatmap(..)
> >>          .keyBy(..)
> >>          .process(..)
> >>          .addSink(...);
> >>    })
> >>
> >>
> >> ## Improve dealing with Row in DataStream API
> >>
> >> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
> >> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
> >> enough and more handy than Map ?
> >> - Currently, the fieldNames member variable is mutable, is it on
> >> purpose?
> >> Can we make it immutable? For example, only accept from the constructor.
> >> - Why do we accept a nullable `fieldNames`?
> >>
> >> 7. "a Row has two modes represented by an internal boolean flag
> >> `hasFieldOrder`."
> >> Sorry, I don't fully understand what does the `hasFieldOrder` mean
> >> and is
> >> used for. Could you explain a bit more for this?
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote:
> >>
> >>> Hi David,
> >>>
> >>> thanks for your feedback. Feedback from someone who interacts with many
> >>> users is very valuable. I added an explanation for StatementSets to the
> >>> FLIP.
> >>>
> >>> Regarding watermarks and fromInsertStream, actually the
> >>>
> >>> `Schema.watermark("ts", system_watermark())`
> >>>
> >>> is not really necessary in the `fromChangelogStream`. It is added to
> >>> satify the Schema interface and be similar to SQL DDL.
> >>>
> >>> We could already extract the watermark strategy if we see
> >>> `system_rowtime()` because in most of the cases we will simply use the
> >>> DataStream API watermarks.
> >>>
> >>> But maybe some users want to generate watermarks after preprocessing in
> >>> DataStream API. In this cases users what to define a computed watermark
> >>> expression.
> >>>
> >>> So for simplicity in the Simple API we introduce:
> >>>
> >>> tEnv
> >>>     .fromInsertStream(DataStream<T>)
> >>>     .select('*, system_rowtime().as("rowtime"),
> >>> system_proctime().as("proctime"))
> >>>
> >>> and just rely on the watermarks that travel through DataStream API
> >>> already. I added another comment to the FLIP.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 19.08.20 10:53, David Anderson wrote:
> >>>> Timo, nice to see this.
> >>>>
> >>>> As someone who expects to use these interfaces, but who doesn't fully
> >>>> understand the existing Table API, I like what I see. Just a couple of
> >>>> comments:
> >>>>
> >>>> The way that watermarks fit into the fromChangelogStream case makes
> >>>> sense
> >>>> to me, and I'm wondering why watermarks don't come up in the previous
> >>>> section about fromInsertStream.
> >>>>
> >>>> I wasn't familiar with StatementSets, and I couldn't find an
> >>>> explanation
> >>> in
> >>>> the docs. I eventually found this short paragraph in an email from
> >>>> Fabian
> >>>> Hueske, which clarified everything in that section for me:
> >>>>
> >>>>       FLIP-84 [1] added the concept of a "statement set" to group
> >>>> multiple
> >>>> INSERT
> >>>>       INTO statements (SQL or Table API) together. The statements in a
> >>>> statement
> >>>>       set are jointly optimized and executed as a single Flink job.
> >>>>
> >>>> Maybe if you add this to the FLIP it will help other readers as well.
> >>>>
> >>>> Best,
> >>>> David
> >>>>
> >>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org>
> >>> wrote:
> >>>>
> >>>>> Hi everyone,
> >>>>>
> >>>>> I would like to propose a FLIP that aims to resolve the remaining
> >>>>> shortcomings in the Table API:
> >>>>>
> >>>>>
> >>>>>
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>>
> >>>>>
> >>>>> The Table API has received many new features over the last year. It
> >>>>> supports a new type system (FLIP-37), connectors support changelogs
> >>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >>>>> support for result retrieval in an interactive fashion (FLIP-84), and
> >>>>> soon new TableDescriptors (FLIP-129).
> >>>>>
> >>>>> However, the interfaces from and to DataStream API have not been
> >>>>> touched
> >>>>> during the introduction of these new features and are kind of
> >>>>> outdated.
> >>>>> The interfaces lack important functionality that is available in
> >>>>> Table
> >>>>> API but not exposed to DataStream API users. DataStream API is
> >>>>> still our
> >>>>> most important API which is why a good interoperability is crucial.
> >>>>>
> >>>>> This FLIP is a mixture of different topics that improve the
> >>>>> interoperability between DataStream and Table API in terms of:
> >>>>>
> >>>>> - DataStream <-> Table conversion
> >>>>> - translation of type systems TypeInformation <-> DataType
> >>>>> - schema definition (incl. rowtime, watermarks, primary key)
> >>>>> - changelog handling
> >>>>> - row handling in DataStream API
> >>>>>
> >>>>> I'm looking forward to your feedback.
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>

Reply via email to