Hi Timo,

Thanks a lot for the great proposal and sorry for the late reply. This is
an important improvement for DataStream and Table API users.

I have listed my thoughts and questions below ;-)

## Conversion of DataStream to Table

1. "We limit the usage of `system_rowtime()/system_proctime` to the leaf of
a QueryOperation tree in the validation phase."
IIUC, that means `system_rowtime()` can only be used in the first
`select()` after `fromXxxStream()`, right?
However, I think `system_proctime()` shouldn't have this limitation,
because it doesn't rely on the underlying timestamp of StreamRecord and
 can be generated in any stage of the query.

2. "By using `system_rowtime().as("rowtime")` the watermark would be
assigned implicitly. "
What watermark will be used here? Is the pre-assigned watermark in the
DataStream (so called `system_watermak()`)?

## Conversion of Table to DataStream

3. "StreamTableEnvironment.toChangelogStream(ChangelogMode, Table):
DataStream<Row>"
I'm not sure whether this method is useful for users. Currently, the
`DynamicTableSinks#getChagnelogMode(changelogMode)` is only used for
filtering UPDATE_BEFORE if possible.
However, if we expose this method to users, it may be confusing. Users may
try to use this method to convert a changelog stream to an insert-only
stream by applying ChangelogMode.insertOnly(). This might be misleading.
What's more, it's cumbersome if users don't want UPDATE_BEFORE. They have
to know the ChangelogMode of the current Table first, and remove
UPDATE_BEFORE from the ChagnelogMode.
That means we have to support `Table.getChangelogMode()` first? But
`ChangelogMode` derivation requires a full optimization path on the Table,
which seems impossible now.
Therefore, IMHO, we can introduce this interface in the future if users
indeed need this. For most users, I think `toChangelogStream(Table)` is
enough.

4. "Table.execute(ChangelogMode)"
Ditto.

## Conversion of StatementSet to DataStream API

5. "StreamStatementSet#attachToStream()"
I think the potential drawback is that it can't support multi-sink
optimization, i.e. share pipeline.
For example, if we have a Table `t1` (a heavy view uses join, aggregate),
and want to sink to "mysql" using SQL and want to continue processing using
DataStream in a job.
It's a huge waste of resources if we re-compute `t1`. It would be nice if
we can come up with a solution to share the pipeline.

I borrowed Godfrey's idea in FLINK-18840 and added some modifications. What
do you think about the following proposal?

interface StatementSet {
   StatementSet addDataStream(Table table, TableDataStreamTransform
transform);
}

interface TableDataStreamTransform {
   void transform(Context);

   interface Context {
       Table getTable();
       DataStream<Row> toInsertStream(Table);
       DataStream<T> toInsertStream(AbstractDataType<?>, Table);
       DataStream<Row> toChangelogStream(Table);
   }
}

tEnv
  .createStatementSet()
  .addInsert("mysql", table1)
  .addDataStream(table1, ctx -> {
      ctx.toInsertStream(ctx.getTable())
        .flatmap(..)
        .keyBy(..)
        .process(..)
        .addSink(...);
  })


## Improve dealing with Row in DataStream API

6. "Row#setFieldNames(@Nullable Map<String, Integer> fieldNames)"
- Maybe `List<String> fieldNames` or `String[] fieldNames` parameter is
enough and more handy than Map ?
- Currently, the fieldNames member variable is mutable, is it on purpose?
Can we make it immutable? For example, only accept from the constructor.
- Why do we accept a nullable `fieldNames`?

7. "a Row has two modes represented by an internal boolean flag
`hasFieldOrder`."
Sorry, I don't fully understand what does the `hasFieldOrder` mean and is
used for. Could you explain a bit more for this?

Best,
Jark


On Wed, 19 Aug 2020 at 17:38, Timo Walther <twal...@apache.org> wrote:

> Hi David,
>
> thanks for your feedback. Feedback from someone who interacts with many
> users is very valuable. I added an explanation for StatementSets to the
> FLIP.
>
> Regarding watermarks and fromInsertStream, actually the
>
> `Schema.watermark("ts", system_watermark())`
>
> is not really necessary in the `fromChangelogStream`. It is added to
> satify the Schema interface and be similar to SQL DDL.
>
> We could already extract the watermark strategy if we see
> `system_rowtime()` because in most of the cases we will simply use the
> DataStream API watermarks.
>
> But maybe some users want to generate watermarks after preprocessing in
> DataStream API. In this cases users what to define a computed watermark
> expression.
>
> So for simplicity in the Simple API we introduce:
>
> tEnv
>    .fromInsertStream(DataStream<T>)
>    .select('*, system_rowtime().as("rowtime"),
> system_proctime().as("proctime"))
>
> and just rely on the watermarks that travel through DataStream API
> already. I added another comment to the FLIP.
>
> Regards,
> Timo
>
>
> On 19.08.20 10:53, David Anderson wrote:
> > Timo, nice to see this.
> >
> > As someone who expects to use these interfaces, but who doesn't fully
> > understand the existing Table API, I like what I see. Just a couple of
> > comments:
> >
> > The way that watermarks fit into the fromChangelogStream case makes sense
> > to me, and I'm wondering why watermarks don't come up in the previous
> > section about fromInsertStream.
> >
> > I wasn't familiar with StatementSets, and I couldn't find an explanation
> in
> > the docs. I eventually found this short paragraph in an email from Fabian
> > Hueske, which clarified everything in that section for me:
> >
> >      FLIP-84 [1] added the concept of a "statement set" to group multiple
> > INSERT
> >      INTO statements (SQL or Table API) together. The statements in a
> > statement
> >      set are jointly optimized and executed as a single Flink job.
> >
> > Maybe if you add this to the FLIP it will help other readers as well.
> >
> > Best,
> > David
> >
> > On Wed, Aug 19, 2020 at 10:22 AM Timo Walther <twal...@apache.org>
> wrote:
> >
> >> Hi everyone,
> >>
> >> I would like to propose a FLIP that aims to resolve the remaining
> >> shortcomings in the Table API:
> >>
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API
> >>
> >> The Table API has received many new features over the last year. It
> >> supports a new type system (FLIP-37), connectors support changelogs
> >> (FLIP-95), we have well defined internal data structures (FLIP-95),
> >> support for result retrieval in an interactive fashion (FLIP-84), and
> >> soon new TableDescriptors (FLIP-129).
> >>
> >> However, the interfaces from and to DataStream API have not been touched
> >> during the introduction of these new features and are kind of outdated.
> >> The interfaces lack important functionality that is available in Table
> >> API but not exposed to DataStream API users. DataStream API is still our
> >> most important API which is why a good interoperability is crucial.
> >>
> >> This FLIP is a mixture of different topics that improve the
> >> interoperability between DataStream and Table API in terms of:
> >>
> >> - DataStream <-> Table conversion
> >> - translation of type systems TypeInformation <-> DataType
> >> - schema definition (incl. rowtime, watermarks, primary key)
> >> - changelog handling
> >> - row handling in DataStream API
> >>
> >> I'm looking forward to your feedback.
> >>
> >> Regards,
> >> Timo
> >>
> >
>
>

Reply via email to