+1 I like the general idea of printing the results as a table.

On the specifics I don't know enough but Fabians suggestions seems to make sense to me.

Aljoscha

On 29.04.20 10:56, Fabian Hueske wrote:
Hi Godfrey,

Thanks for starting this discussion!

In my mind, WATERMARK is a property (or constraint) of a field, just like
PRIMARY KEY.
Take this example from MySQL:

mysql> CREATE TABLE people (id INT NOT NULL, name VARCHAR(128) NOT NULL,
age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.06 sec)

mysql> describe people;
+-------+--------------+------+-----+---------+-------+
| Field | Type         | Null | Key | Default | Extra |
+-------+--------------+------+-----+---------+-------+
| id    | int          | NO   | PRI | NULL    |       |
| name  | varchar(128) | NO   |     | NULL    |       |
| age   | int          | YES  |     | NULL    |       |
+-------+--------------+------+-----+---------+-------+
3 rows in set (0.01 sec)

Here, PRIMARY KEY is marked in the Key column of the id field.
We could do the same for watermarks by adding a Watermark column.

Best, Fabian


Am Mi., 29. Apr. 2020 um 10:43 Uhr schrieb godfrey he <godfre...@gmail.com>:

Hi everyone,

I would like to bring up a discussion about the result type of describe
statement,
which is introduced in FLIP-84[1].
In previous version, we define the result type of `describe` statement is a
single column as following

Statement

Result Schema

Result Value

Result Kind

Examples

DESCRIBE xx

field name: result

field type: VARCHAR(n)

(n is the max length of values)

describe the detail of an object

(single row)

SUCCESS_WITH_CONTENT

DESCRIBE table_name

for "describe table_name", the result value is the `toString` value of
`TableSchema`, which is an unstructured data.
It's hard to for user to use this info.

for example:

TableSchema schema = TableSchema.builder()
    .field("f0", DataTypes.BIGINT())
    .field("f1", DataTypes.ROW(
       DataTypes.FIELD("q1", DataTypes.STRING()),
       DataTypes.FIELD("q2", DataTypes.TIMESTAMP(3))))
    .field("f2", DataTypes.STRING())
    .field("f3", DataTypes.BIGINT(), "f0 + 1")
    .watermark("f1.q2", WATERMARK_EXPRESSION, WATERMARK_DATATYPE)
    .build();

its `toString` value is:
root
  |-- f0: BIGINT
  |-- f1: ROW<`q1` STRING, `q2` TIMESTAMP(3)>
  |-- f2: STRING
  |-- f3: BIGINT AS f0 + 1
  |-- WATERMARK FOR f1.q2 AS now()

For hive, MySQL, etc., the describe result is table form including field
names and field types.
which is more familiar with users.
TableSchema[2] has watermark expression and compute column, we should also
put them into the table:
for compute column, it's a column level, we add a new column named `expr`.
  for watermark expression, it's a table level, we add a special row named
`WATERMARK` to represent it.

The result will look like about above example:

name

type

expr

f0

BIGINT

(NULL)

f1

ROW<`q1` STRING, `q2` TIMESTAMP(3)>

(NULL)

f2

STRING

NULL

f3

BIGINT

f0 + 1

WATERMARK

(NULL)

f1.q2 AS now()

now there is a pr FLINK-17112 [3] to implement DESCRIBE statement.

What do you think about this update?
Any feedback are welcome~

Best,
Godfrey

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]

https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
[3] https://github.com/apache/flink/pull/11892


godfrey he <godfre...@gmail.com> 于2020年4月6日周一 下午10:38写道:

Hi Timo,

Sorry for the late reply, and thanks for your correction.
I missed DQL for job submission scenario.
I'll fix the document right away.

Best,
Godfrey

Timo Walther <twal...@apache.org> 于2020年4月3日周五 下午9:53写道:

Hi Godfrey,

I'm sorry to jump in again but I still need to clarify some things
around TableResult.

The FLIP says:
"For DML, this method returns TableResult until the job is submitted.
For other statements, TableResult is returned until the execution is
finished."

I thought we agreed on making every execution async? This also means
returning a TableResult for DQLs even though the execution is not done
yet. People need access to the JobClient also for batch jobs in order to
cancel long lasting queries. If people want to wait for the completion
they can hook into JobClient or collect().

Can we rephrase this part to:

The FLIP says:
"For DML and DQL, this method returns TableResult once the job has been
submitted. For DDL and DCL statements, TableResult is returned once the
operation has finished."

Regards,
Timo


On 02.04.20 05:27, godfrey he wrote:
Hi Aljoscha, Dawid, Timo,

Thanks so much for the detailed explanation.
Agree with you that the multiline story is not completed now, and we
can
keep discussion.
I will add current discussions and conclusions to the FLIP.

Best,
Godfrey



Timo Walther <twal...@apache.org> 于2020年4月1日周三 下午11:27写道:

Hi Godfrey,

first of all, I agree with Dawid. The multiline story is not
completed
by this FLIP. It just verifies the big picture.

1. "control the execution logic through the proposed method if they
know
what the statements are"

This is a good point that also Fabian raised in the linked google
doc.
I
could also imagine to return a more complicated POJO when calling
`executeMultiSql()`.

The POJO would include some `getSqlProperties()` such that a platform
gets insights into the query before executing. We could also trigger
the
execution more explicitly instead of hiding it behind an iterator.

2. "there are some special commands introduced in SQL client"

For platforms and SQL Client specific commands, we could offer a hook
to
the parser or a fallback parser in case the regular table environment
parser cannot deal with the statement.

However, all of that is future work and can be discussed in a
separate
FLIP.

3. +1 for the `Iterator` instead of `Iterable`.

4. "we should convert the checked exception to unchecked exception"

Yes, I meant using a runtime exception instead of a checked
exception.
There was no consensus on putting the exception into the
`TableResult`.

Regards,
Timo

On 01.04.20 15:35, Dawid Wysakowicz wrote:
When considering the multi-line support I think it is helpful to
start
with a use case in mind. In my opinion consumers of this method will
be:

   1. sql-client
   2. third-part sql based platforms

@Godfrey As for the quit/source/... commands. I think those belong
to
the responsibility of aforementioned. I think they should not be
understandable by the TableEnvironment. What would quit on a
TableEnvironment do? Moreover I think such commands should be
prefixed
appropriately. I think it's a common practice to e.g. prefix those
with
! or : to say they are meta commands of the tool rather than a
query.

I also don't necessarily understand why platform users need to know
the
kind of the query to use the proposed method. They should get the
type
from the TableResult#ResultKind. If the ResultKind is SUCCESS, it
was
a
DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL. If that's not
enough
we can enrich the TableResult with more explicit kind of query, but
so
far I don't see such a need.

@Kurt In those cases I would assume the developers want to present
results of the queries anyway. Moreover I think it is safe to assume
they can adhere to such a contract that the results must be
iterated.

For direct users of TableEnvironment/Table API this method does not
make
much sense anyway, in my opinion. I think we can rather safely
assume
in
this scenario they do not want to submit multiple queries at a
single
time.

Best,

Dawid


On 01/04/2020 15:07, Kurt Young wrote:
One comment to `executeMultilineSql`, I'm afraid sometimes user
might
forget to
iterate the returned iterators, e.g. user submits a bunch of DDLs
and
expect the
framework will execute them one by one. But it didn't.

Best,
Kurt


On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek<
aljos...@apache.org>
wrote:

Agreed to what Dawid and Timo said.

To answer your question about multi line SQL: no, we don't think
we
need
this in Flink 1.11, we only wanted to make sure that the
interfaces
that
we now put in place will potentially allow this in the future.

Best,
Aljoscha

On 01.04.20 09:31, godfrey he wrote:
Hi, Timo & Dawid,

Thanks so much for the effort of `multiline statements
supporting`,
I have a few questions about this method:

1. users can well control the execution logic through the
proposed
method
     if they know what the statements are (a statement is a DDL, a
DML
or
others).
but if a statement is from a file, that means users do not know
what
the
statements are,
the execution behavior is unclear.
As a platform user, I think this method is hard to use, unless
the
platform
defines
a set of rule about the statements order, such as: no select in
the
middle,
dml must be at tail of sql file (which may be the most case in
product
env).
Otherwise the platform must parse the sql first, then know what
the
statements are.
If do like that, the platform can handle all cases through
`executeSql`
and
`StatementSet`.

2. SQL client can't also use `executeMultilineSql` to supports
multiline
statements,
     because there are some special commands introduced in SQL
client,
such as `quit`, `source`, `load jar` (not exist now, but maybe we
need
this
command
     to support dynamic table source and udf).
Does TableEnvironment also supports those commands?

3. btw, we must have this feature in release-1.11? I find there
are
few
user cases
     in the feedback document which behavior is unclear now.

regarding to "change the return value from `Iterable<Row` to
`Iterator<Row`",
I couldn't agree more with this change. Just as Dawid mentioned
"The contract of the Iterable#iterator is that it returns a new
iterator
each time,
     which effectively means we can iterate the results multiple
times.",
we does not provide iterate the results multiple times.
If we want do that, the client must buffer all results. but it's
impossible
for streaming job.

Best,
Godfrey

Dawid Wysakowicz<dwysakow...@apache.org>  于2020年4月1日周三 上午3:14写道:

Thank you Timo for the great summary! It covers (almost) all the
topics.
Even though in the end we are not suggesting much changes to the
current
state of FLIP I think it is important to lay out all possible
use
cases
so that we do not change the execution model every release.

There is one additional thing we discussed. Could we change the
result
type of TableResult#collect to Iterator<Row>? Even though those
interfaces do not differ much. I think Iterator better describes
that
the results might not be materialized on the client side, but
can
be
retrieved on a per record basis. The contract of the
Iterable#iterator
is that it returns a new iterator each time, which effectively
means
we
can iterate the results multiple times. Iterating the results is
not
possible when we don't retrieve all the results from the cluster
at
once.
I think we should also use Iterator for
TableEnvironment#executeMultilineSql(String statements):
Iterator<TableResult>.

Best,

Dawid

On 31/03/2020 19:27, Timo Walther wrote:
Hi Godfrey,

Aljoscha, Dawid, Klou, and I had another discussion around
FLIP-84.
In
particular, we discussed how the current status of the FLIP and
the
future requirements around multiline statements, async/sync,
collect()
fit together.

We also updated the FLIP-84 Feedback Summary document [1] with
some
use cases.

We believe that we found a good solution that also fits to what
is
in
the current FLIP. So no bigger changes necessary, which is
great!

Our findings were:

1. Async vs sync submission of Flink jobs:

Having a blocking `execute()` in DataStream API was rather a
mistake.
Instead all submissions should be async because this allows
supporting
both modes if necessary. Thus, submitting all queries async
sounds
good to us. If users want to run a job sync, they can use the
JobClient and wait for completion (or collect() in case of
batch
jobs).

2. Multi-statement execution:

For the multi-statement execution, we don't see a
contradication
with
the async execution behavior. We imagine a method like:

TableEnvironment#executeMultilineSql(String statements):
Iterable<TableResult>

Where the `Iterator#next()` method would trigger the next
statement
submission. This allows a caller to decide synchronously when
to
submit statements async to the cluster. Thus, a service such as
the
SQL Client can handle the result of each statement individually
and
process statement by statement sequentially.

3. The role of TableResult and result retrieval in general

`TableResult` is similar to `JobClient`. Instead of returning a
`CompletableFuture` of something, it is a concrete util class
where
some methods have the behavior of completable future (e.g.
collect(),
print()) and some are already completed (getTableSchema(),
getResultKind()).

`StatementSet#execute()` returns a single `TableResult` because
the
order is undefined in a set and all statements have the same
schema.
Its `collect()` will return a row for each executed `INSERT
INTO` in
the order of statement definition.

For simple `SELECT * FROM ...`, the query execution might block
until
`collect()` is called to pull buffered rows from the job (from
socket/REST API what ever we will use in the future). We can
say
that
a statement finished successfully, when the
`collect#Iterator#hasNext`
has returned false.

I hope this summarizes our discussion @Dawid/Aljoscha/Klou?

It would be great if we can add these findings to the FLIP
before we
start voting.

One minor thing: some `execute()` methods still throw a checked
exception; can we remove that from the FLIP? Also the above
mentioned
`Iterator#next()` would trigger an execution without throwing a
checked exception.

Thanks,
Timo

[1]




https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
On 31.03.20 06:28, godfrey he wrote:
Hi, Timo & Jark

Thanks for your explanation.
Agree with you that async execution should always be async,
and sync execution scenario can be covered  by async
execution.
It helps provide an unified entry point for batch and
streaming.
I think we can also use sync execution for some testing.
So, I agree with you that we provide `executeSql` method and
it's
async
method.
If we want sync method in the future, we can add method named
`executeSqlSync`.

I think we've reached an agreement. I will update the
document,
and
start
voting process.

Best,
Godfrey


Jark Wu<imj...@gmail.com>  于2020年3月31日周二 上午12:46写道:

Hi,

I didn't follow the full discussion.
But I share the same concern with Timo that streaming queries
should
always
be async.
Otherwise, I can image it will cause a lot of confusion and
problems
if
users don't deeply keep the "sync" in mind (e.g. client
hangs).
Besides, the streaming mode is still the majority use cases
of
Flink
and
Flink SQL. We should put the usability at a high priority.

Best,
Jark


On Mon, 30 Mar 2020 at 23:27, Timo Walther<
twal...@apache.org>
wrote:
Hi Godfrey,

maybe I wasn't expressing my biggest concern enough in my
last
mail.
Even in a singleline and sync execution, I think that
streaming
queries
should not block the execution. Otherwise it is not possible
to
call
collect() or print() on them afterwards.

"there are too many things need to discuss for multiline":

True, I don't want to solve all of them right now. But what
I
know
is
that our newly introduced methods should fit into a
multiline
execution.
There is no big difference of calling `executeSql(A),
executeSql(B)` and
processing a multiline file `A;\nB;`.

I think the example that you mentioned can simply be
undefined
for
now.
Currently, no catalog is modifying data but just metadata.
This
is a
separate discussion.

"result of the second statement is indeterministic":

Sure this is indeterministic. But this is the implementers
fault
and we
cannot forbid such pipelines.

How about we always execute streaming queries async? It
would
unblock
executeSql() and multiline statements.

Having a `executeSqlAsync()` is useful for batch. However, I
don't
want
`sync/async` be the new batch/stream flag. The execution
behavior
should
come from the query itself.

Regards,
Timo


On 30.03.20 11:12, godfrey he wrote:
Hi Timo,

Agree with you that streaming queries is our top priority,
but I think there are too many things need to discuss for
multiline
statements:
e.g.
1. what's the behaivor of DDL and DML mixing for async
execution:
create table t1 xxx;
create table t2 xxx;
insert into t2 select * from t1 where xxx;
drop table t1; // t1 may be a MySQL table, the data will
also be
deleted.
t1 is dropped when "insert" job is running.

2. what's the behaivor of unified scenario for async
execution:
(as you
mentioned)
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

The result of the second statement is indeterministic,
because
the
first
statement maybe is running.
I think we need to put a lot of effort to define the
behavior of
logically
related queries.

In this FLIP, I suggest we only handle single statement,
and
we
also
introduce an async execute method
which is more important and more often used for users.

Dor the sync methods (like `TableEnvironment.executeSql`
and
`StatementSet.execute`),
the result will be returned until the job is finished. The
following
methods will be introduced in this FLIP:

       /**
        * Asynchronously execute the given single statement
        */
TableEnvironment.executeSqlAsync(String statement):
TableResult

/**
       * Asynchronously execute the dml statements as a
batch
       */
StatementSet.executeAsync(): TableResult

public interface TableResult {
         /**
          * return JobClient for DQL and DML in async mode,
else
return
Optional.empty
          */
         Optional<JobClient> getJobClient();
}

what do you think?

Best,
Godfrey

Timo Walther<twal...@apache.org>  于2020年3月26日周四 下午9:15写道:

Hi Godfrey,

executing streaming queries must be our top priority
because
this
is
what distinguishes Flink from competitors. If we change
the
execution
behavior, we should think about the other cases as well to
not
break
the
API a third time.

I fear that just having an async execute method will not
be
enough
because users should be able to mix streaming and batch
queries
in a
unified scenario.

If I remember it correctly, we had some discussions in the
past
about
what decides about the execution mode of a query.
Currently, we
would
like to let the query decide, not derive it from the
sources.

So I could image a multiline pipeline as:

USE CATALOG 'mycat';
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

For executeMultilineSql():

sync because regular SQL
sync because regular Batch SQL
async because Streaming SQL

For executeAsyncMultilineSql():

async because everything should be async
async because everything should be async
async because everything should be async

What we should not start for executeAsyncMultilineSql():

sync because DDL
async because everything should be async
async because everything should be async

What are you thoughts here?

Regards,
Timo


On 26.03.20 12:50, godfrey he wrote:
Hi Timo,

I agree with you that streaming queries mostly need async
execution.
In fact, our original plan is only introducing sync
methods in
this
FLIP,
and async methods (like "executeSqlAsync") will be
introduced
in
the
future
which is mentioned in the appendix.

Maybe the async methods also need to be considered in
this
FLIP.

I think sync methods is also useful for streaming which
can be
used
to
run
bounded source.
Maybe we should check whether all sources are bounded in
sync
execution
mode.

Also, if we block for streaming queries, we could never
support
multiline files. Because the first INSERT INTO would
block
the
further
execution.
agree with you, we need async method to submit multiline
files,
and files should be limited that the DQL and DML should
be
always in
the
end for streaming.

Best,
Godfrey

Timo Walther<twal...@apache.org>  于2020年3月26日周四
下午4:29写道:


Hi Godfrey,

having control over the job after submission is a
requirement
that
was
requested frequently (some examples are [1], [2]). Users
would
like
to
get insights about the running or completed job.
Including
the
jobId,
jobGraph etc., the JobClient summarizes these
properties.

It is good to have a discussion about
synchronous/asynchronous
submission now to have a complete execution picture.

I thought we submit streaming queries mostly async and
just
wait for
the
successful submission. If we block for streaming
queries,
how
can we
collect() or print() results?

Also, if we block for streaming queries, we could never
support
multiline files. Because the first INSERT INTO would
block
the
further
execution.

If we decide to block entirely on streaming queries, we
need
the
async
execution methods in the design already. However, I
would
rather go
for
non-blocking streaming queries. Also with the `EMIT
STREAM`
key
word
in
mind that we might add to SQL statements soon.

Regards,
Timo

[1]https://issues.apache.org/jira/browse/FLINK-16761
[2]https://issues.apache.org/jira/browse/FLINK-12214

On 25.03.20 16:30, godfrey he wrote:
Hi Timo,

Thanks for the updating.

Regarding to "multiline statement support", I'm also
fine
that
`TableEnvironment.executeSql()` only supports single
line
statement,
and
we
can support multiline statement later (needs more
discussion
about
this).
Regarding to "StatementSet.explian()", I don't have
strong
opinions
about
that.

Regarding to "TableResult.getJobClient()", I think it's
unnecessary.
The
reason is: first, many statements (e.g. DDL, show xx,
use
xx)
will
not
submit a Flink job. second,
`TableEnvironment.executeSql()`
and
`StatementSet.execute()` are synchronous method,
`TableResult`
will
be
returned only after the job is finished or failed.

Regarding to "whether StatementSet.execute() needs to
throw
exception", I
think we should choose a unified way to tell whether
the
execution
is
successful. If `TableResult` contains ERROR kind
(non-runtime
exception),
users need to not only check the result but also catch
the
runtime
exception in their code. or `StatementSet.execute()`
does
not
throw
any
exception (including runtime exception), all exception
messages are
in
the
result.  I prefer "StatementSet.execute() needs to
throw
exception".
cc
@Jark
Wu<imj...@gmail.com>

I will update the agreed parts to the document first.

Best,
Godfrey


Timo Walther<twal...@apache.org>  于2020年3月25日周三
下午6:51写道:

Hi Godfrey,

thanks for starting the discussion on the mailing
list.
And
sorry
again
for the late reply to FLIP-84. I have updated the
Google
doc
one
more
time to incorporate the offline discussions.

         From Dawid's and my view, it is fine to
postpone the
multiline
support
to a separate method. This can be future work even
though
we
will
need
it rather soon.

If there are no objections, I suggest to update the
FLIP-84
again
and
have another voting process.

Thanks,
Timo


On 25.03.20 11:17, godfrey he wrote:
Hi community,
Timo, Fabian and Dawid have some feedbacks about
FLIP-84[1].
The
feedbacks
are all about new introduced methods. We had a
discussion
yesterday,
and
most of feedbacks have been agreed upon. Here is the
conclusions:

*1. about proposed methods in `TableEnvironment`:*

the original proposed methods:

TableEnvironment.createDmlBatch(): DmlBatch
TableEnvironment.executeStatement(String statement):
ResultTable

the new proposed methods:

// we should not use abbreviations in the API, and
the
term
"Batch"
is
easily confused with batch/streaming processing
TableEnvironment.createStatementSet(): StatementSet

// every method that takes SQL should have `Sql` in
its
name
// supports multiline statement ???
TableEnvironment.executeSql(String statement):
TableResult

// new methods. supports explaining DQL and DML
TableEnvironment.explainSql(String statement,
ExplainDetail...
details):
String


*2. about proposed related classes:*

the original proposed classes:

interface DmlBatch {
             void addInsert(String insert);
             void addInsert(String targetPath, Table
table);
             ResultTable execute() throws Exception ;
             String explain(boolean extended);
}

public interface ResultTable {
             TableSchema getResultSchema();
             Iterable<Row> getResultRows();
}

the new proposed classes:

interface StatementSet {
             // every method that takes SQL should
have
`Sql` in
its
name
             // return StatementSet instance for
fluent
programming
             addInsertSql(String statement):
StatementSet

             // return StatementSet instance for
fluent
programming
             addInsert(String tablePath, Table table):
StatementSet
             // new method. support overwrite mode
             addInsert(String tablePath, Table table,
boolean
overwrite):
StatementSet

             explain(): String

             // new method. supports adding more
details
for the
result
             explain(ExplainDetail... extraDetails):
String

             // throw exception ???
             execute(): TableResult
}

interface TableResult {
             getTableSchema(): TableSchema

             // avoid custom parsing of an "OK" row in
programming
             getResultKind(): ResultKind

             // instead of `get` make it explicit that
this
is
might
be
triggering
an expensive operation
             collect(): Iterable<Row>

             // for fluent programming
             print(): Unit
}

enum ResultKind {
             SUCCESS, // for DDL, DCL and statements
with a
simple
"OK"
             SUCCESS_WITH_CONTENT, // rows with
important
content are
available
(DML, DQL)
}


*3. new proposed methods in `Table`*

`Table.insertInto()` will be deprecated, and the
following
methods
are
introduced:

Table.executeInsert(String tablePath): TableResult
Table.executeInsert(String tablePath, boolean
overwrite):
TableResult
Table.explain(ExplainDetail... details): String
Table.execute(): TableResult

There are two issues need further discussion, one is
whether
`TableEnvironment.executeSql(String statement):
TableResult`
needs
to
support multiline statement (or whether
`TableEnvironment`
needs
to
support
multiline statement), and another one is whether
`StatementSet.execute()`
needs to throw exception.

please refer to the feedback document [2] for the
details.

Any suggestions are warmly welcomed!

[1]




https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]




https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
Best,
Godfrey









Reply via email to