Hi all,

I really like the ideas of this FLIP. I think it improves user
experience quite a bit. I wanted to add just two comments:

1. As for the StatementSet I like the approach described in the FLIP for
its simplicity. Moreover the way I see it is that if a user wants to
work with DataStream, then he/she wants to end up in the DataStream API,
or in other words call the StreamExecutionEnvironment#execute.

2. @Timo What is the interaction between Row setters from the different
modes? What happens if the user calls both in different order. E.g.

row.setField(0, "ABC");

row.setField("f0", "ABC"); // is this a valid call ?

or

row.setField("f0", "ABC");

row.setField(0, "ABC"); // is this a valid call ?

or

row.setFieldNames(...);

row.setField(0, "ABC"); // is this a valid call ?

Best,

Dawid

On 01/09/2020 11:49, Timo Walther wrote:
> Hi Jark,
>
> thanks for the detailed review. Let me answer your concerns:
>
> ## Conversion of DataStream to Table
>
> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
> leaf of a QueryOperation tree in the validation phase."
> I'm fine with allowing `system_proctime` everywhere in the query. Also
> for SQL, I think we should have done that earlier already to give
> users the chance to have time based operations also at later stages.
>
> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
> assigned implicitly. "
> Yes, we just use the DataStream API watermark. `system_rowtime()` will
> just introduce a time attribute, the watermark travels to the Table
> API and into DataStream API without further code changes.
>
> ## Conversion of Table to DataStream
>
> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
> DataStream<Row>"
> 4. "Table.execute(ChangelogMode)"
> Filtering UPDATE_BEFORE is already quite important as it reduces the
> amount of data by factor 2. But I also understand your concerns
> regarding confusing users. I also got the request for a
> `Table.getChangelogMode()` a couple of times in the past, because
> users would like to get information about the kind of query that is
> executed. However, in this case `toChangelogStream(Table)` is
> equivalent to call ``toChangelogStream(Table.getChangelogMode(),
> Table)` so we don't need `Table.getChangelogMode()` in the current
> FLIP design. But this can be future work. Let's start with
> `toChangelogStream(Table)` and wait for more feedback about this new
> feature. What do others think?
>
> ## Conversion of StatementSet to DataStream API
>
> 5. "StreamStatementSet#attachToStream()"
>
> I think Godfrey's proposal is too complex for regular users. Instead
> of continuing with the fluent programming, we would force users to
> define a DataStream pipeline in a lambda.
>
> Furthermore, joining or using connect() with a different DataStream
> source would not be possible in this design.
>
> The `execute()` method of `StatementSet` should not execute the
> DataStream API subprogram. It mixes the concepts because we tell
> users: "If you use toDataStream" you need to use
> `StreamExecutionEnvironment.execute()`.
>
> We don't solve every potential use case with the current FLIP design
> but the most important one where a pipeline just uses an INSERT INTO
> but also uses Table API for connectors and preprocessing and does the
> main logic in DataStream API:
>
> T1 -> T2, T3 -> DataStream, T4 -> DataStream
>
> I would consider `StatementSet.addDataStream(Table, ...)` future work
> for now as it is only an opimization for reusing parts of the
> StreamGraph. We could even perform this optimization when calling
> `toInsertStream` or `toChangelogStream`.
>
> ## Improve dealing with Row in DataStream API
>
> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>
> We need a Map for constant time of mapping field name to index.
>
> We accept a nullable `fieldNames` because names are not mandatory, one
> can also work with indices as before.
>
> But you are right that the fieldNames member variable can be
> immutable. I just wanted to avoid too many overloaded constructors.
> I'm fine with having one full constructor for RowKind, arity and field
> names (or null).
>
> 7. "a Row has two modes represented by an internal boolean flag
> `hasFieldOrder`."
> Maybe I leaked to many implementation details there that rather
> confuse readers than help. Internally, we need to distinguish between
> two kinds of rows. A user should not be bothered by this.
>
> a) Row comes from Table API runtime: hasFieldOrder = true
> Map("myAge" -> 0, "myName" -> 1)
>
> row.getField("myName") == row.getField(1)
> row.getField("myAge") == row.getField(0)
>
> b) Row comes from user: hasFieldOrder = false
> Row row = new Row(2);
> row.setField("myName", "Alice");
> row.setField("myAge", 32);
>
> Map("myAge" -> 1, "myName" -> 0)
>
> But the type information will decide about the order of the fields
> later and reorder them accordingly during serialization or RowData
> conversion:
>
> ["myName", "myAge"] vs. ["myAge", "myName"]
>
> The user must not care about this as it always feels naturally to deal
> with the rows.
>
> Regards,
> Timo
>
>
> On 01.09.20 06:19, Jark Wu wrote:
>> Hi Timo,
>>
>> Thanks a lot for the great proposal and sorry for the late reply.
>> This is
>> an important improvement for DataStream and Table API users.
>>
>> I have listed my thoughts and questions below ;-)
>>
>> ## Conversion of DataStream to Table
>>
>> 1. "We limit the usage of `system_rowtime()/system_proctime` to the
>> leaf of
>> a QueryOperation tree in the validation phase."
>> IIUC, that means `system_rowtime()` can only be used in the first
>> `select()` after `fromXxxStream()`, right?
>> However, I think `system_proctime()` shouldn't have this limitation,
>> because it doesn't rely on the underlying timestamp of StreamRecord and
>>   can be generated in any stage of the query.
>>
>> 2. "By using `system_rowtime().as("rowtime")` the watermark would be
>> assigned implicitly. "
>> What watermark will be used here? Is the pre-assigned watermark in the
>> DataStream (so called `system_watermak()`)?
>>
>> ## Conversion of Table to DataStream
>>
>> 3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
>> DataStream<Row>"
>> I'm not sure whether this method is useful for users. Currently, the
>> `DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
>> filtering UPDATE_BEFORE if possible.
>> However, if we expose this method to users, it may be confusing.
>> Users may
>> try to use this method to convert a changelog stream to an insert-only
>> stream by applying ChangelogMode.insertOnly(). This might be misleading.
>> What's more, it's cumbersome if users don't want UPDATE_BEFORE. They
>> have
>> to know the ChangelogMode of the current Table first, and remove
>> UPDATE_BEFORE from the ChagnelogMode.
>> That means we have to support `Table.getChangelogMode()` first? But
>> `ChangelogMode` derivation requires a full optimization path on the
>> Table,
>> which seems impossible now.
>> Therefore, IMHO, we can introduce this interface in the future if users
>> indeed need this. For most users, I think `toChangelogStream(Table)` is
>> enough.
>>
>> 4. "Table.execute(ChangelogMode)"
>> Ditto.
>>
>> ## Conversion of StatementSet to DataStream API
>>
>> 5. "StreamStatementSet#attachToStream()"
>> I think the potential drawback is that it can't support multi-sink
>> optimization, i.e. share pipeline.
>> For example, if we have a Table `t1` (a heavy view uses join,
>> aggregate),
>> and want to sink to "mysql" using SQL and want to continue processing
>> using
>> DataStream in a job.
>> It's a huge waste of resources if we re-compute `t1`. It would be
>> nice if
>> we can come up with a solution to share the pipeline.
>>
>> I borrowed Godfrey's idea in FLINK-18840 and added some
>> modifications. What
>> do you think about the following proposal?
>>
>> interface StatementSet {
>>     StatementSet addDataStream(Table table, TableDataStreamTransform
>> transform);
>> }
>>
>> interface TableDataStreamTransform {
>>     void transform(Context);
>>
>>     interface Context {
>>         Table getTable();
>>         DataStream<Row> toInsertStream(Table);
>>         DataStream<T> toInsertStream(AbstractDataType<?>, Table);
>>         DataStream<Row> toChangelogStream(Table);
>>     }
>> }
>>
>> tEnv
>>    .createStatementSet()
>>    .addInsert("mysql", table1)
>>    .addDataStream(table1, ctx -> {
>>        ctx.toInsertStream(ctx.getTable())
>>          .flatmap(..)
>>          .keyBy(..)
>>          .process(..)
>>          .addSink(...);
>>    })
>>
>>
>> ## Improve dealing with Row in DataStream API
>>
>> 6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
>> - Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
>> enough and more handy than Map ?
>> - Currently, the fieldNames member variable is mutable, is it on
>> purpose?
>> Can we make it immutable? For example, only accept from the constructor.
>> - Why do we accept a nullable `fieldNames`?
>>
>> 7. "a Row has two modes represented by an internal boolean flag
>> `hasFieldOrder`."
>> Sorry, I don't fully understand what does the `hasFieldOrder` mean
>> and is
>> used for. Could you explain a bit more for this?
>>
>> Best,
>> Jark
>>
>>
>> On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote:
>>
>>> Hi David,
>>>
>>> thanks for your feedback. Feedback from someone who interacts with many
>>> users is very valuable. I added an explanation for StatementSets to the
>>> FLIP.
>>>
>>> Regarding watermarks and fromInsertStream, actually the
>>>
>>> `Schema.watermark("ts", system_watermark())`
>>>
>>> is not really necessary in the `fromChangelogStream`. It is added to
>>> satify the Schema interface and be similar to SQL DDL.
>>>
>>> We could already extract the watermark strategy if we see
>>> `system_rowtime()` because in most of the cases we will simply use the
>>> DataStream API watermarks.
>>>
>>> But maybe some users want to generate watermarks after preprocessing in
>>> DataStream API. In this cases users what to define a computed watermark
>>> expression.
>>>
>>> So for simplicity in the Simple API we introduce:
>>>
>>> tEnv
>>>     .fromInsertStream(DataStream<T>)
>>>     .select('*, system_rowtime().as("rowtime"),
>>> system_proctime().as("proctime"))
>>>
>>> and just rely on the watermarks that travel through DataStream API
>>> already. I added another comment to the FLIP.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 19.08.20 10:53, David Anderson wrote:
>>>> Timo, nice to see this.
>>>>
>>>> As someone who expects to use these interfaces, but who doesn't fully
>>>> understand the existing Table API, I like what I see. Just a couple of
>>>> comments:
>>>>
>>>> The way that watermarks fit into the fromChangelogStream case makes
>>>> sense
>>>> to me, and I'm wondering why watermarks don't come up in the previous
>>>> section about fromInsertStream.
>>>>
>>>> I wasn't familiar with StatementSets, and I couldn't find an
>>>> explanation
>>> in
>>>> the docs. I eventually found this short paragraph in an email from
>>>> Fabian
>>>> Hueske, which clarified everything in that section for me:
>>>>
>>>>       FLIP-84 [1] added the concept of a "statement set" to group
>>>> multiple
>>>> INSERT
>>>>       INTO statements (SQL or Table API) together. The statements in a
>>>> statement
>>>>       set are jointly optimized and executed as a single Flink job.
>>>>
>>>> Maybe if you add this to the FLIP it will help other readers as well.
>>>>
>>>> Best,
>>>> David
>>>>
>>>> On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org>
>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I would like to propose a FLIP that aims to resolve the remaining
>>>>> shortcomings in the Table API:
>>>>>
>>>>>
>>>>>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
>>>
>>>>>
>>>>> The Table API has received many new features over the last year. It
>>>>> supports a new type system (FLIP-37), connectors support changelogs
>>>>> (FLIP-95), we have well defined internal data structures (FLIP-95),
>>>>> support for result retrieval in an interactive fashion (FLIP-84), and
>>>>> soon new TableDescriptors (FLIP-129).
>>>>>
>>>>> However, the interfaces from and to DataStream API have not been
>>>>> touched
>>>>> during the introduction of these new features and are kind of
>>>>> outdated.
>>>>> The interfaces lack important functionality that is available in
>>>>> Table
>>>>> API but not exposed to DataStream API users. DataStream API is
>>>>> still our
>>>>> most important API which is why a good interoperability is crucial.
>>>>>
>>>>> This FLIP is a mixture of different topics that improve the
>>>>> interoperability between DataStream and Table API in terms of:
>>>>>
>>>>> - DataStream <-> Table conversion
>>>>> - translation of type systems TypeInformation <-> DataType
>>>>> - schema definition (incl. rowtime, watermarks, primary key)
>>>>> - changelog handling
>>>>> - row handling in DataStream API
>>>>>
>>>>> I'm looking forward to your feedback.
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>
>>>
>>>
>>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to