Re: [DISCUSS] Setup a bui...@flink.apache.org mailing list for travis builds

2019-07-22 Thread Timo Walther

+1 sounds good to inform people about instabilities or other issues

Regards,
Timo


Am 22.07.19 um 09:58 schrieb Haibo Sun:

+1. Sounds good.Letting the PR creators know the build results of the master 
branch can help to determine more quickly whether Travis failures of their PR 
are an exact failure or because of the instability of test case. By the way, if 
the PR creator can abort their own Travis build, I think it can improve the 
efficient use of Travis resources (of course, this discussion does not deal 
with this issue).


Best,
Haibo
At 2019-07-22 12:36:31, "Yun Tang"  wrote:

+1 sounds good, more people could be encouraged to involve in paying attention 
to failure commit.

Best
Yun Tang

From: Becket Qin 
Sent: Monday, July 22, 2019 9:44
To: dev 
Subject: Re: [DISCUSS] Setup a bui...@flink.apache.org mailing list for travis 
builds

+1. Sounds a good idea to me.

On Sat, Jul 20, 2019 at 7:07 PM Dian Fu  wrote:


Thanks Jark for the proposal, sounds reasonable for me. +1. This ML could
be used for all the build notifications including master and CRON jobs.


在 2019年7月20日,下午2:55,Xu Forward  写道:

+1 ,Thanks jark for the proposal.
Best
Forward

Jark Wu  于2019年7月20日周六 下午12:14写道:


Hi all,

As far as I know, currently, email notifications of Travis builds for
master branch are sent to the commit author when a build was just

broken or

still is broken. And there is no email notifications for CRON builds.

Recently, we are suffering from compile errors for scala-2.12 and java-9
which are only ran in CRON jobs. So I'm figuring out a way to get
notifications of CRON builds (or all builds) to quick fix compile errors
and failed cron tests.

After reaching out to @Chesnay Schepler  (thanks

for

the helping), I know that we are using a Slack channel to receive all
failed build notifications. However, IMO, email notification might be a
better way than Slack channel to encourage more people to pay attention

on

the builds.

So I'm here to propose to setup a bui...@flink.apache.org mailing list

for

receiving build notifications. I also find that Beam has such mailing

list

too[1]. After we have such a mailing list, we can integrate it to travis
according to the travis doc[2].

What do you think? Do we need a formal vote for this?

Best and thanks,
Jark

[1]: https://beam.apache.org/community/contact-us/
[2]:



https://docs.travis-ci.com/user/notifications/#configuring-email-notifications

<


https://docs.travis-ci.com/user/notifications/#configuring-email-notifications

<


https://docs.travis-ci.com/user/notifications/#configuring-email-notifications





Re: [DISCUSS] Support temporary tables in SQL API

2019-07-22 Thread Timo Walther
Thanks for summarizing our offline discussion Dawid! Even though I would 
prefer solution 1 instead of releasing half-baked features, I also 
understand that the Table API should not further block the next release. 
Therefore, I would be fine with solution 3 but introduce the new 
user-facing `createTemporaryTable` methods as synonyms of the existing 
ones already. This allows us to deprecate the methods with undefined 
behavior as early as possible.


Thanks,
Timo


Am 22.07.19 um 16:13 schrieb Dawid Wysakowicz:

Hi all,

When working on FLINK-13279[1] we realized we could benefit from a
better temporary objects support in the Catalog API/Table API.
Unfortunately we are already long past the feature freeze that's why I
wanted to get some opinions from the community how should we proceed
with this topic. I tried to prepare a summary of the current state and 3
different suggested approaches that we could take. Please see the
attached document[2]

I will appreciate your thoughts!


[1] https://issues.apache.org/jira/browse/FLINK-13279

[2]
https://docs.google.com/document/d/1RxLj4tDB9GXVjF5qrkM38SKUPkvJt_BSefGYTQ-cVX4/edit?usp=sharing






Re: [DISCUSS] Support computed column for Flink SQL

2019-07-29 Thread Timo Walther

Hi Danny,

thanks for working on this issue and writing down the concept 
suggestion. We are currently still in the progress of finalizing the 1.9 
release. Having proper streaming DDL support will definitely be part of 
Flink 1.10. I will take a look at the whole DDL efforts very soon once 
the 1.9 release is out.


Thanks,
Timo

Am 23.07.19 um 11:00 schrieb Danny Chan:

In umbrella task FLINK-10232[1] we have introduced CREATE TABLE grammar in our 
new module flink-sql-parser. And we proposed to use computed column to describe 
the time attribute of process time in the design doc FLINK SQL DDL[2], so user 
may create a table with process time attribute as following:

create table T1(
a int,
b bigint,
c varchar,
d as PROC_TIME,
) with (
k1 = v1,
k2 = v2
);

The column d would be a process time attribute for table T1. There are also 
many other use cases for computed columns[3].

It may not be a big change here, but may touch the TableSchema, which is a 
public API for user now, so i'm very appreciate for your suggestions(especially 
its relationship with the TableSchema).

I write a simple design doc here[3].

[1] https://issues.apache.org/jira/browse/FLINK-10232
[2] 
https://docs.google.com/document/d/1OmVyuPk9ibGUC-CnPHbXvCg_fdG1TeC3lXSnqcUEYmM
[3] 
https://docs.google.com/document/d/110TseRtTCphxETPY7uhiHpu-dph3NEesh3mYKtJ7QOY/edit?usp=sharing

Best,
Danny Chan





Re: [DISCUSS][CODE STYLE] Usage of Java Optional

2019-08-02 Thread Timo Walther

Hi everyone,

I would vote for using Optional only as method return type for 
non-performance critical code. Nothing more. No fields, no method 
parameters. Method parameters can be overloaded and internally a class 
can work with nulls and @Nullable. Optional is meant for API method 
return types and I think we should not abuse it and spam the code with 
`@SuppressWarnings("OptionalUsedAsFieldOrParameterType")`.


Regards,

Timo



Am 02.08.19 um 11:08 schrieb Biao Liu:

Hi Jark & Zili,

I thought it means "Optional should not be used for class fields". However
now I get a bit confused about the edited version.

Anyway +1 to "Optional should not be used for class fields"

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 2, 2019 at 5:00 PM Zili Chen  wrote:


Hi Jark,

Follow your opinion, for class field, we can make
use of @Nullable/@Nonnull annotation or Flink's
SerializableOptional. It would be sufficient.

Best,
tison.


Jark Wu  于2019年8月2日周五 下午4:57写道:


Hi Andrey,

I have some concern on point (3) "even class fields as e.g. optional

config

options with implicit default values".

Regarding to the Oracle's guide (4) "Optional should not be used for

class

fields".
And IntelliJ IDEA also report warnings if a class field is Optional,
because Optional is not serializable.


Do we allow Optional as class field only if the class is not serializable
or forbid this totally?

Thanks,
Jark

On Fri, 2 Aug 2019 at 16:30, Biao Liu  wrote:


Hi Andrey,

Thanks for working on this.

+1 it's clear and acceptable for me.

To Qi,

IMO the most performance critical codes are "per record" code path. We
should definitely avoid Optional there. For your concern, it's "per

buffer"

code path which seems to be acceptable with Optional.

Just one more question, is there any other code paths which are also
critical? I think we'd better note that clearly.

Thanks,
Biao /'bɪ.aʊ/



On Fri, Aug 2, 2019 at 11:08 AM qi luo  wrote:


Agree that using Optional will improve code robustness. However we’re
hesitating to use Optional in data intensive operations.

For example, SingleInputGate is already creating Optional for every
BufferOrEvent in getNextBufferOrEvent(). How much performance gain

would

we

get if it’s replaced by null check?

Regards,
Qi


On Aug 1, 2019, at 11:00 PM, Andrey Zagrebin 
wrote:

Hi all,

This is the next follow up discussion about suggestions for the

recent

thread about code style guide in Flink [1].

In general, one could argue that any variable, which is nullable,

can

be

replaced by wrapping it with Optional to explicitly show that it

can

be

null. Examples are:

   - returned values to force user to check not null
   - optional function arguments, e.g. with implicit default values
   - even class fields as e.g. optional config options with implicit
   default values


At the same time, we also have @Nullable annotation to express this
intention.

Also, when the class Optional was introduced, Oracle posted a

guideline

about its usage [2]. Basically, it suggests to use it mostly in

APIs

for

returned values to inform and force users to check the returned

value

instead of returning null and avoid NullPointerException.

Wrapping with Optional also comes with the performance overhead.

Following the Oracle's guide in general, the suggestion is:

   - Avoid using Optional in any performance critical code
   - Use Optional only to return nullable values in the API/public

methods

   unless it is performance critical then rather use @Nullable
   - Passing an Optional argument to a method can be allowed if it

is

   within a private helper method and simplifies the code, example

is

in

[3]

   - Optional should not be used for class fields


Please, feel free to share you thoughts.

Best,
Andrey

[1]


http://mail-archives.apache.org/mod_mbox/flink-dev/201906.mbox/%3ced91df4b-7cab-4547-a430-85bc710fd...@apache.org%3E

[2]


https://www.oracle.com/technetwork/articles/java/java8-optional-2175753.html

[3]


https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95






Re: [DISCUSS] Merging new features post-feature-freeze

2019-08-08 Thread Timo Walther

Hi Kurt,

I posted my opinion around this particular example in FLINK-13225.

Regarding the definition of "feature freeze": I think it is good to 
write down more of the implicit processes that we had in the past. The 
bylaws, coding guidelines, and a better FLIP process are very good steps 
towards the right direction. However, not everything can be written down 
and formulized. We should also remind ourselves of basic software 
engineering principles. Merging a feature shortly before the actual 
release is always dangerous. A feature needs time to settle down and be 
tested for side-effects etc. Merging a feature with a lot of spaghetti 
code, reflection magic, and a single IT case is not a complete feature 
that is worth merging.


I hope we can improve here for the next release. Thanks for the open 
discussion.


Regards,
Timo


Am 08.08.19 um 11:11 schrieb Kurt Young:

Hi Stephan,

Thanks for bringing this up. I think it's important and a good time to
discuss what
does *feature freeze* really means. At least to me, seems I have some
misunderstandings with this comparing to other community members. But as
you
pointed out in the jira and also in this mail, I think your understanding
makes sense
to me.

Maybe we can have a conclusion in the thread and put this into the project
bylaws
which are under discussion?

Regarding to FLINK-13225, I would like to hear other's opinion since I
merged it. But
I would like to revert it if someone voted for reverting it.

Sorry for the inconvenience I caused.

Best,
Kurt


On Thu, Aug 8, 2019 at 4:46 PM Stephan Ewen  wrote:


Hi all!

I would like to bring this topic up, because we saw quite a few "secret"
post-feature-freeze feature merges.
The latest example was https://issues.apache.org/jira/browse/FLINK-13225

I would like to make sure that we are all on the same page on what a
feature freeze means and how to handle possible additions after the feature
freeze.
My understanding was the following, and I assume that this was also the
understanding of the community when we started establishing the release
practice:

   - Feature freeze is the date until new features can be merged.
   - After the feature freeze, we only merge bug fixes, release relevant
tests (end to end tests), and documentation.
   - Features should already be stable and have tests. It is not okay to
"get a foot in the door" before feature freeze by merging something that is
not ready (as a placeholder) and then fixing it post feature freeze.
   - Extending functionality to new components is not a bug fix, it is a
feature.
   - If someone wants to add a minor feature after the feature freeze, and
there is a good reason for that, it should be explicitly discussed. If
there is no objection, it can be merged.

Please let me know if you have a different understanding of what feature
freeze means.

Regarding the issue of FLINK-13225
?
   - Should we keep it?
   - Should we revert it in the release-1.9 branch and only keep it for
master?

Best,
Stephan





Re: [DISCUSS] Merging new features post-feature-freeze

2019-08-08 Thread Timo Walther

Hi Xuefu,

I disagree with "all those work would be wasted/useless", it would just 
take effect 3 months later.


Regarding "I don't see eye to eye on how and when we had decided a 
feature freeze", there was an official [ANNOUNCE] email that targeted 
June 28 [1]. I think nobody is super strict about such a date and an 
additional day or two but we need to stop merging into a branch to 
ensure build stability.


The Flink community decided on time-based releases some time ago, of 
course we could discuss this policy again. But generally speaking we 
should release quickly because every release contains nice features that 
users are waiting for. The PR in question was not listed in the intial 
feature discussion [2] and mentioned for the first time mid/end of June.


Personally, for the next release, I would prefer to vote on a list of 
FLIP topics that qualify for a release (given that they are finished in 
time with the expected quality).


What do you think?

Thanks,
Timo

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/ANNOUNCE-Feature-freeze-for-Apache-Flink-1-9-0-release-tp29751.html
[2] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html


Am 08.08.19 um 15:43 schrieb Xuefu Z:

Hi all,

I understand the merged PR is a feature, but it's something we had planned
and  requested for a long time. In fact, at Hive connector side, we have
done a lot of work (supporting hive udf). Without this PR, all those work
would be wasted and Hive feature itself in 1.9 would also be close to being
useless.

I also agree that feature freeze means something and has its importance. On
the other hand, I don't see eye to eye on how and when we had decided a
feature freeze should be in place. To me,  our feature freeze seems to be
time based. That is, we determine a time by which feature freeze will
happen, irregardless original feature plan. As a result, this practice
incurs a great deal of randomness, leaving many planned feature half baked.
The question is really about how we balance releasing something  in time
vs  releasing something usable. This might be a great chance for us to
meditate on this topic.

The PR in question is requested by me, and its importance to Hive connector
makes me stand by my request. On the other hand, if the PR has anything to
improve, I'm all for it.

Thanks,
Xuefu

On Thu, Aug 8, 2019 at 2:59 AM Timo Walther  wrote:


Hi Kurt,

I posted my opinion around this particular example in FLINK-13225.

Regarding the definition of "feature freeze": I think it is good to
write down more of the implicit processes that we had in the past. The
bylaws, coding guidelines, and a better FLIP process are very good steps
towards the right direction. However, not everything can be written down
and formulized. We should also remind ourselves of basic software
engineering principles. Merging a feature shortly before the actual
release is always dangerous. A feature needs time to settle down and be
tested for side-effects etc. Merging a feature with a lot of spaghetti
code, reflection magic, and a single IT case is not a complete feature
that is worth merging.

I hope we can improve here for the next release. Thanks for the open
discussion.

Regards,
Timo


Am 08.08.19 um 11:11 schrieb Kurt Young:

Hi Stephan,

Thanks for bringing this up. I think it's important and a good time to
discuss what
does *feature freeze* really means. At least to me, seems I have some
misunderstandings with this comparing to other community members. But as
you
pointed out in the jira and also in this mail, I think your understanding
makes sense
to me.

Maybe we can have a conclusion in the thread and put this into the

project

bylaws
which are under discussion?

Regarding to FLINK-13225, I would like to hear other's opinion since I
merged it. But
I would like to revert it if someone voted for reverting it.

Sorry for the inconvenience I caused.

Best,
Kurt


On Thu, Aug 8, 2019 at 4:46 PM Stephan Ewen  wrote:


Hi all!

I would like to bring this topic up, because we saw quite a few "secret"
post-feature-freeze feature merges.
The latest example was

https://issues.apache.org/jira/browse/FLINK-13225

I would like to make sure that we are all on the same page on what a
feature freeze means and how to handle possible additions after the

feature

freeze.
My understanding was the following, and I assume that this was also the
understanding of the community when we started establishing the release
practice:

- Feature freeze is the date until new features can be merged.
- After the feature freeze, we only merge bug fixes, release relevant
tests (end to end tests), and documentation.
- Features should already be stable and have tests. It is not okay to
"get a foot in the door" before feature freeze by merging something

that is

not ready (as a placeholder) and the

Re: [VOTE] Flink Project Bylaws

2019-08-12 Thread Timo Walther

+1

Thanks for all the efforts you put into this for documenting how the 
project operates.


Regards,
Timo

Am 12.08.19 um 10:44 schrieb Aljoscha Krettek:

+1


On 11. Aug 2019, at 10:07, Becket Qin  wrote:

Hi all,

I would like to start a voting thread on the project bylaws of Flink. It
aims to help the community coordinate more smoothly. Please see the bylaws
wiki page below for details.

https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120731026

The discussion thread is following:

http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Flink-project-bylaws-td30409.html

The vote will be open for at least 6 days. PMC members' votes are
considered as binding. The vote requires 2/3 majority of the binding +1s to
pass.

Thanks,

Jiangjie (Becket) Qin





Re: [DISCUSS] FLIP-51: Rework of the Expression Design

2019-08-14 Thread Timo Walther

Hi Jingsong,

thanks for writing down this FLIP. Big +1 from my side to finally get 
rid of PlannerExpressions and have consistent and well-defined behavior 
for Table API and SQL updated to FLIP-37.


We might need to discuss some of the behavior of particular functions 
but this should not affect the actual FLIP-51.


Regards,
Timo


Am 13.08.19 um 12:55 schrieb JingsongLee:

Hi everyone,

We would like to start a discussion thread on "FLIP-51: Rework of the
Expression Design"(Design doc: [1], FLIP: [2]), where we describe how
  to improve the new java Expressions to work with type inference and
  convert expression to the calcite RexNode. This is a follow-up plan
for FLIP-32[3] and FLIP-37[4]. This FLIP is mostly based on FLIP-37.

This FLIP addresses several shortcomings of current:
- New Expressions still use PlannerExpressions to type inference and
  to RexNode. Flnk-planner and blink-planner have a lot of repetitive code
  and logic.
- Let TableApi and Cacite definitions consistent.
- Reduce the complexity of Function development.
- Powerful Function for user.

Key changes can be summarized as follows:
- Improve the interface of FunctionDefinition.
- Introduce type inference for built-in functions.
- Introduce ExpressionConverter to convert Expression to calcite
  RexNode.
- Remove repetitive code and logic in planners.

I also listed type inference and behavior of all built-in functions [5], to
verify that the interface is satisfied. After introduce type inference to
table-common module, planners should have a unified function behavior.
And this gives the community also the chance to quickly discuss types
  and behavior of functions a last time before they are declared stable.

Looking forward to your feedbacks. Thank you.

[1] 
https://docs.google.com/document/d/1yFDyquMo_-VZ59vyhaMshpPtg7p87b9IYdAtMXv5XmM/edit?usp=sharing
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions
[4] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System
[5] 
https://docs.google.com/document/d/1fyVmdGgbO1XmIyQ1BaoG_h5BcNcF3q9UJ1Bj1euO240/edit?usp=sharing

Best,
Jingsong Lee





Re: [VOTE] Apache Flink Release 1.9.0, release candidate #2

2019-08-15 Thread Timo Walther

Hi Kurt,

I agree that this is a serious bug. However, I would not block the 
release because of this. As you said, there is a workaround and the 
`execute()` works in the most common case of a single execution. We can 
fix this in a minor release shortly after.


What do others think?

Regards,
Timo


Am 15.08.19 um 11:23 schrieb Kurt Young:

HI,

We just find a serious bug around blink planner:
https://issues.apache.org/jira/browse/FLINK-13708
When user reused the table environment instance, and call `execute` method
multiple times for
different sql, the later call will trigger the earlier ones to be
re-executed.

It's a serious bug but seems we also have a work around, which is never
reuse the table environment
object. I'm not sure if we should treat this one as blocker issue of 1.9.0.

What's your opinion?

Best,
Kurt


On Thu, Aug 15, 2019 at 2:01 PM Gary Yao  wrote:


+1 (non-binding)

Jepsen test suite passed 10 times consecutively

On Wed, Aug 14, 2019 at 5:31 PM Aljoscha Krettek 
wrote:


+1

I did some testing on a Google Cloud Dataproc cluster (it gives you a
managed YARN and Google Cloud Storage (GCS)):
   - tried both YARN session mode and YARN per-job mode, also using
bin/flink list/cancel/etc. against a YARN session cluster
   - ran examples that write to GCS, both with the native Hadoop

FileSystem

and a custom “plugin” FileSystem
   - ran stateful streaming jobs that use GCS as a checkpoint backend
   - tried running SQL programs on YARN using the SQL Cli: this worked for
YARN session mode but not for YARN per-job mode. Looking at the code I
don’t think per-job mode would work from seeing how it is implemented.

But

I think it’s an OK restriction to have for now
   - in all the testing I had fine-grained recovery (region failover)
enabled but I didn’t simulate any failures


On 14. Aug 2019, at 15:20, Kurt Young  wrote:

Hi,

Thanks for preparing this release candidate. I have verified the

following:

- verified the checksums and GPG files match the corresponding release

files

- verified that the source archives do not contains any binaries
- build the source release with Scala 2.11 successfully.
- ran `mvn verify` locally, met 2 issuses [FLINK-13687] and

[FLINK-13688],

but
both are not release blockers. Other than that, all tests are passed.
- ran all e2e tests which don't need download external packages (it's

very

unstable
in China and almost impossible to download them), all passed.
- started local cluster, ran some examples. Met a small website display
issue
[FLINK-13591], which is also not a release blocker.

Although we have pushed some fixes around blink planner and hive
integration
after RC2, but consider these are both preview features, I'm lean to be

ok

to release
without these fixes.

+1 from my side. (binding)

Best,
Kurt


On Wed, Aug 14, 2019 at 5:13 PM Jark Wu  wrote:


Hi Gordon,

I have verified the following things:

- build the source release with Scala 2.12 and Scala 2.11 successfully
- checked/verified signatures and hashes
- checked that all POM files point to the same version
- ran some flink table related end-to-end tests locally and succeeded
(except TPC-H e2e failed which is reported in FLINK-13704)
- started cluster for both Scala 2.11 and 2.12, ran examples, verified

web

ui and log output, nothing unexpected
- started cluster, ran a SQL query to temporal join with kafka source

and

mysql jdbc table, and write results to kafka again. Using DDL to

create

the

source and sinks. looks good.
- reviewed the release PR

As FLINK-13704 is not recognized as blocker issue, so +1 from my side
(non-binding).

On Tue, 13 Aug 2019 at 17:07, Till Rohrmann 

wrote:

Hi Richard,

although I can see that it would be handy for users who have PubSub

set

up,

I would rather not include examples which require an external

dependency

into the Flink distribution. I think examples should be

self-contained.

My

concern is that we would bloat the distribution for many users at the
benefit of a few. Instead, I think it would be better to make these
examples available differently, maybe through Flink's ecosystem

website

or

maybe a new examples section in Flink's documentation.

Cheers,
Till

On Tue, Aug 13, 2019 at 9:43 AM Jark Wu  wrote:


Hi Till,

After thinking about we can use VARCHAR as an alternative of
timestamp/time/date.
I'm fine with not recognize it as a blocker issue.
We can fix it into 1.9.1.


Thanks,
Jark


On Tue, 13 Aug 2019 at 15:10, Richard Deurwaarder 

wrote:

Hello all,

I noticed the PubSub example jar is not included in the examples/

dir

of

flink-dist. I've created

https://issues.apache.org/jira/browse/FLINK-13700

+ https://github.com/apache/flink/pull/9424/files to fix this.

I will leave it up to you to decide if we want to add this to

1.9.0.

Regards,

Richard

On Tue, Aug 13, 2019 at 9:04 AM Till Rohrmann <

trohrm...@apache.org>

wrote:


Hi Jark,

thanks for reporting this issue. Could this be a documented

limitation

of

Blink'

Re: [DISCUSS] FLIP-51: Rework of the Expression Design

2019-08-15 Thread Timo Walther

Hi,

regarding the LegacyTypeInformation esp. for decimals. I don't have a 
clear answer yet, but I think it should not limit us. If possible it 
should travel through the type inference and we only need some special 
cases at some locations e.g. when computing the leastRestrictive. E.g. 
the logical type root is set correctly which is required for family 
checking.


I'm wondering when a decimal type with precision can occur. Usually, it 
should come from literals or defined column. But I think it might also 
be ok that the flink-planner just receives a decimal with precision and 
treats it with Java semantics.


Doing it on the planner side is the easiest option but also the one that 
could cause side-effects if the back-and-forth conversion of 
LegacyTypeConverter is not a 1:1 mapping anymore. I guess we will see 
the implications during implementation. All old Flink tests should pass 
still.


Regards,
Timo

Am 15.08.19 um 10:43 schrieb JingsongLee:

Hi @Timo Walther @Dawid Wysakowicz:

Now, flink-planner have some legacy DataTypes:
like: legacy decimal, legacy basic array type info...
And If the new type inference infer a Decimal/VarChar with precision, there 
should will fail in TypeConversions.

The better we do on DataType, the more problems we will have with 
TypeInformation conversion, and the new TypeInference is a lot of precision 
related.
What do you think?
1. Should TypeConversions support all data types and flink-planner support new 
types?2. Or do a special conversion between flink-planner and type inference.

(There are many problems with the conversion between TypeInformation and 
DataType, and I think we should solve them completely in 1.10.)

Best,
Jingsong Lee


--
From:JingsongLee 
Send Time:2019年8月15日(星期四) 10:31
To:dev 
Subject:Re: [DISCUSS] FLIP-51: Rework of the Expression Design

Hi jark:

I'll add a chapter to list blink planner extended functions.

Best,
  Jingsong Lee


--
From:Jark Wu 
Send Time:2019年8月15日(星期四) 05:12
To:dev 
Subject:Re: [DISCUSS] FLIP-51: Rework of the Expression Design

Thanks Jingsong for starting the discussion.

The general design of the FLIP looks good to me. +1 for the FLIP. It's time
to get rid of the old Expression!

Regarding to the function behavior, shall we also include new functions
from blink planner (e.g. LISTAGG, REGEXP, TO_DATE, etc..) ?


Best,
Jark





On Wed, 14 Aug 2019 at 23:34, Timo Walther  wrote:


Hi Jingsong,

thanks for writing down this FLIP. Big +1 from my side to finally get
rid of PlannerExpressions and have consistent and well-defined behavior
for Table API and SQL updated to FLIP-37.

We might need to discuss some of the behavior of particular functions
but this should not affect the actual FLIP-51.

Regards,
Timo


Am 13.08.19 um 12:55 schrieb JingsongLee:

Hi everyone,

We would like to start a discussion thread on "FLIP-51: Rework of the
Expression Design"(Design doc: [1], FLIP: [2]), where we describe how
   to improve the new java Expressions to work with type inference and
   convert expression to the calcite RexNode. This is a follow-up plan
for FLIP-32[3] and FLIP-37[4]. This FLIP is mostly based on FLIP-37.

This FLIP addresses several shortcomings of current:
 - New Expressions still use PlannerExpressions to type inference and
   to RexNode. Flnk-planner and blink-planner have a lot of repetitive

code

   and logic.
 - Let TableApi and Cacite definitions consistent.
 - Reduce the complexity of Function development.
 - Powerful Function for user.

Key changes can be summarized as follows:
 - Improve the interface of FunctionDefinition.
 - Introduce type inference for built-in functions.
 - Introduce ExpressionConverter to convert Expression to calcite
   RexNode.
 - Remove repetitive code and logic in planners.

I also listed type inference and behavior of all built-in functions [5],

to

verify that the interface is satisfied. After introduce type inference to
table-common module, planners should have a unified function behavior.
And this gives the community also the chance to quickly discuss types
   and behavior of functions a last time before they are declared stable.

Looking forward to your feedbacks. Thank you.

[1]

https://docs.google.com/document/d/1yFDyquMo_-VZ59vyhaMshpPtg7p87b9IYdAtMXv5XmM/edit?usp=sharing

[2]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design

[3]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-32%3A+Restructure+flink-table+for+future+contributions

[4]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-37%3A+Rework+of+the+Table+API+Type+System

[5]

https://docs.google.com/document/d/1fyVmdGgbO1XmIyQ1BaoG_h5BcNcF3q9UJ1Bj1euO240/edit?usp=sharing

Best,
Jingsong Lee







Re: [VOTE] FLIP-51: Rework of the Expression Design

2019-08-15 Thread Timo Walther

+1 for this.

Thanks,
Timo

Am 15.08.19 um 15:57 schrieb JingsongLee:

Hi Flink devs,

I would like to start the voting for FLIP-51 Rework of the Expression
  Design.

FLIP wiki:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-51%3A+Rework+of+the+Expression+Design

Discussion thread:
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-51-Rework-of-the-Expression-Design-td31653.html

Google Doc:
https://docs.google.com/document/d/1yFDyquMo_-VZ59vyhaMshpPtg7p87b9IYdAtMXv5XmM/edit?usp=sharing

Thanks,

Best,
Jingsong Lee





[DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-16 Thread Timo Walther

Hi everyone,

Dawid and I are working on making parts of ExecutionConfig and 
TableConfig configurable via config options. This is necessary to make 
all properties also available in SQL. Additionally, with the new SQL DDL 
based on properties as well as more connectors and formats coming up, 
unified configuration becomes more important.


We need more features around string-based configuration in the future, 
which is why Dawid and I would like to propose FLIP-54 for evolving the 
ConfigOption and Configuration classes:


https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit

In summary it adds:
- documented types and validation
- more common types such as memory size, duration, list
- simple non-nested object types

Looking forward to your feedback,
Timo



Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-19 Thread Timo Walther
ou elaborate a bit
more what did you mean?

Best,

Dawid

On 18/08/2019 16:42, Stephan Ewen wrote:

I like the idea of enhancing the configuration and to do early

validation.

I feel that some of the ideas in the FLIP seem a bit ad hoc, though. For
example, having a boolean "isList" is a clear indication of not having
thought through the type/category system.
Also, having a more clear category system makes validation simpler.

For example, I have seen systems distinguishing between numeric

parameters

(valid ranges), category parameters (set of possible values), quantities
like duration and memory size (need measure and unit), which results in

an

elegant system for validation.


On Fri, Aug 16, 2019 at 5:22 PM JingsongLee 
.invalid>

wrote:


+1 to this, thanks Timo and Dawid for the design.
This allows the currently cluttered configuration of various
  modules to be unified.
This is also first step of one of the keys to making new unified
TableEnvironment available for production.

Previously, we did encounter complex configurations, such as
specifying the skewed values of column in DDL. The skew may
  be a single field or a combination of multiple fields. So the
  configuration is very troublesome. We used JSON string to
  configure it.

Best,
Jingsong Lee



--
From:Jark Wu 
Send Time:2019年8月16日(星期五) 16:44
To:dev 
Subject:Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

Thanks for starting this design Timo and Dawid,

Improving ConfigOption has been hovering in my mind for a long time.
We have seen the benefit when developing blink configurations and

connector

properties in 1.9 release.
Thanks for bringing it up and make such a detailed design.
I will leave my thoughts and comments there.

Cheers,
Jark


On Fri, 16 Aug 2019 at 22:30, Zili Chen  wrote:


Hi Timo,

It looks interesting. Thanks for preparing this FLIP!

Client API enhancement benefit from this evolution which
hopefully provides a better view of configuration of Flink.
In client API enhancement, we likely make the deployment
of cluster and submission of job totally defined by configuration.

Will take a look at the document in days.

Best,
tison.


Timo Walther  于2019年8月16日周五 下午10:12写道:


Hi everyone,

Dawid and I are working on making parts of ExecutionConfig and
TableConfig configurable via config options. This is necessary to make
all properties also available in SQL. Additionally, with the new SQL

DDL

based on properties as well as more connectors and formats coming up,
unified configuration becomes more important.

We need more features around string-based configuration in the future,
which is why Dawid and I would like to propose FLIP-54 for evolving

the

ConfigOption and Configuration classes:




https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit

In summary it adds:
- documented types and validation
- more common types such as memory size, duration, list
- simple non-nested object types

Looking forward to your feedback,
Timo








Re: [VOTE] Apache Flink Release 1.9.0, release candidate #2

2019-08-19 Thread Timo Walther
On Fri, 16 Aug 2019 at 06:06, Bowen Li <

bowenl...@gmail.com>

wrote:

Hi Jark,

Thanks for letting me know that it's been like this in

previous

releases.

Though I don't think that's the right behavior, it can be

discussed

for

later release. Thus I retract my -1 for RC2.

Bowen


On Thu, Aug 15, 2019 at 7:49 PM Jark Wu <

imj...@gmail.com>

wrote:

Hi Bowen,

Thanks for reporting this.
However, I don't think this is an issue. IMO, it is by

design.

The `tEnv.listUserDefinedFunctions()` in Table API and

`show

functions;`

in

SQL CLI are intended to return only the registered

UDFs,

not

including

built-in functions.
This is also the behavior in previous versions.

Best,
Jark

On Fri, 16 Aug 2019 at 06:52, Bowen Li <

bowenl...@gmail.com>

wrote:

-1 for RC2.

I found a bug

https://issues.apache.org/jira/browse/FLINK-13741,

and I

think it's a blocker.  The bug means currently if

users

call

`tEnv.listUserDefinedFunctions()` in Table API or

`show

functions;`

thru

SQL would not be able to see Flink's built-in

functions.

I'm preparing a fix right now.

Bowen


On Thu, Aug 15, 2019 at 8:55 AM Tzu-Li (Gordon) Tai <

tzuli...@apache.org

wrote:


Thanks for all the test efforts, verifications and

votes

so

far.

So far, things are looking good, but we still

require

one

more

PMC

binding

vote for this RC to be the official release, so I

would

like

to

extend

the

vote time for 1 more day, until *Aug. 16th 17:00

CET*.

In the meantime, the release notes for 1.9.0 had

only

just

been

finalized

[1], and could use a few more eyes before closing

the

vote.

Any help with checking if anything else should be

mentioned

there

regarding

breaking changes / known shortcomings would be

appreciated.

Cheers,
Gordon

[1] https://github.com/apache/flink/pull/9438

On Thu, Aug 15, 2019 at 3:58 PM Kurt Young <

ykt...@gmail.com

wrote:

Great, then I have no other comments on legal

check.

Best,
Kurt


On Thu, Aug 15, 2019 at 9:56 PM Chesnay Schepler

<

ches...@apache.org

wrote:


The licensing items aren't a problem; we don't

care

about

Flink

modules

in NOTICE files, and we don't have to update

the

source-release

licensing since we don't have a pre-built

version

of

the

WebUI

in

the

source.

On 15/08/2019 15:22, Kurt Young wrote:

After going through the licenses, I found 2

suspicions

but

not

sure

if

they

are
valid or not.

1. flink-state-processing-api is packaged in

to

flink-dist

jar,

but

not

included in
NOTICE-binary file (the one under the root

directory)

like

other

modules.

2. flink-runtime-web distributed some

JavaScript

dependencies

through

source

codes, the licenses and NOTICE file were only

updated

inside

the

module

of

flink-runtime-web, but not the NOTICE file

and

licenses

directory

which

under
the  root directory.

Another minor issue I just found is:
FLINK-13558 tries to include table examples

to

flink-dist,

but

I

cannot

find it in
the binary distribution of RC2.

Best,
Kurt


On Thu, Aug 15, 2019 at 6:19 PM Kurt Young <

ykt...@gmail.com

wrote:

Hi Gordon & Timo,

Thanks for the feedback, and I agree with

it.

I

will

document

this

in

the

release notes.

Best,
Kurt


On Thu, Aug 15, 2019 at 6:14 PM Tzu-Li

(Gordon)

Tai

<

tzuli...@apache.org>

wrote:


Hi Kurt,

With the same argument as before, given

that

it

is

mentioned

in

the

release
announcement that it is a preview feature,

I

would

not

block

this

release

because of it.
Nevertheless, it would be important to

mention

this

explicitly

in

the

release notes [1].

Regards,
Gordon

[1]

https://github.com/apache/flink/pull/9438

On Thu, Aug 15, 2019 at 11:29 AM Timo

Walther <

twal...@apache.org>

wrote:

Hi Kurt,

I agree that this is a serious bug.

However, I

would

not

block

the

release because of this. As you said,

there

is a

workaround

and

the

`execute()` works in the most common case

of a

single

execution.

We

can

fix this in a minor release shortly after.

What do others think?

Regards,
Timo


Am 15.08.19 um 11:23 schrieb Kurt Young:

HI,

We just find a serious bug around blink

planner:

https://issues.apache.org/jira/browse/FLINK-13708

When user reused the table environment

instance,

and

call

`execute`

method

multiple times for
different sql, the later call will

trigger

the

earlier

ones

to

be

re-executed.

It's a serious bug but seems we also

have a

work

around,

which

is

never

reuse the table environment
object. I'm not sure if we should treat

this

one

as

blocker

issue

of

1.9.0.

What's your opinion?

Best,
Kurt


On Thu, Aug 15, 2019 at 2:01 PM Gary Yao

<

g...@ververica.com

wrote:

+1 (non-binding)

Jepsen test suite passed 10 times

consecutively

On Wed, Aug 14, 2019 at 5:31 PM Aljoscha

Krettek <

aljos...@apache.org>

wrote:


+1

I did some testing on a Googl

Re: [DISCUSS][CODE STYLE] Usage of Java Optional

2019-08-21 Thread Timo Walther

Thanks for summarizing the discussion Andrey, +1 to this style.

Regards,
Timo


Am 21.08.19 um 11:57 schrieb Andrey Zagrebin:

Hi All,

It looks like we have reached a consensus regarding the last left question.

I suggest the following final summary:

- Use @Nullable annotation where you do not use Optional for the
nullable values
- If you can prove that Optional usage would lead to a performance
degradation in critical code then fallback to @Nullable
- Always use Optional to return nullable values in the API/public
methods except the case of a proven performance concern
- Do not use Optional as a function argument, instead either overload
the method or use the Builder pattern for the set of function arguments
   - Note: an Optional argument can be allowed in a *private* helper
   method if you believe that it simplifies the code, example is in [1]
   - Do not use Optional for class fields

If there are no more comments/concerns/objections I will open a PR to
reflect this in the code style guide.

Bets,
Andrey

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/typeutils/AvroFactory.java#L95

On Tue, Aug 20, 2019 at 10:35 AM Yu Li  wrote:


Thanks for the summarize Andrey!

I'd also like to adjust my -1 to +0 on using Optional as parameter for
private methods due to the existence of the very first rule - "Avoid using
Optional in any performance critical code". I'd regard the "possible GC
burden while using Optional as parameter" also one performance related
factor.

And besides the code convention itself, I believe it's even more important
to make us contributors know the reason behind.

Thanks.

Best Regards,
Yu


On Tue, 20 Aug 2019 at 10:14, Stephan Ewen  wrote:


I think Dawid raised a very good point here.
One of the outcomes should be that we are consistent in our

recommendations

and requests during PR reviews. Otherwise we'll just confuse

contributors.

So I would be
   +1 for someone to use Optional in a private method if they believe it

is

helpful
   -1 to push contributors during reviews to do that


On Tue, Aug 20, 2019 at 9:42 AM Dawid Wysakowicz 
Hi Andrey,

Just wanted to quickly elaborate on my opinion. I wouldn't say I am -1,
just -0 for the Optionals in private methods. I am ok with not
forbidding them there. I just think in all cases there is a better
solution than passing the Optionals around, even in private methods. I
just hope the outcome of the discussion won't be that it is no longer
allowed to suggest those during review.

Best,

Dawid

On 19/08/2019 17:53, Andrey Zagrebin wrote:

Hi all,

Sorry for not getting back to this discussion for some time.
It looks like in general we agree on the initially suggested points:

- Always use Optional only to return nullable values in the

API/public

methods
   - Only if you can prove that Optional usage would lead to a
   performance degradation in critical code then return nullable

value

   directly and annotate it with @Nullable
- Passing an Optional argument to a method can be allowed if it is
within a private helper method and simplifies the code
- Optional should not be used for class fields

The first point can be also elaborated by explicitly forbiding
Optional/Nullable parameters in public methods.
In general we can always avoid Optional parameters by either

overloading

the method or using a pojo with a builder to pass a set of

parameters.

The third point does not prevent from using @Nullable/@Nonnull for

class

fields.
If we agree to not use Optional for fields then not sure I see any

use

case

for SerializableOptional (please comment on that if you have more

details).

@Jingsong Lee
Using Optional in Maps.
I can see this as a possible use case.
I would leave this decision to the developer/reviewer to reason about

it

and keep the scope of this discussion to the variables/parameters as

it

seems to be the biggest point of friction atm.

Finally, I see a split regarding the second point: 
Optional

argument to a method can be allowed if it is within a private helper

method

and simplifies the code>.
There are people who have a strong opinion against allowing it.
Let's vote then for whether to allow it or not.
So far as I see we have the following votes (correct me if wrong and

add

more if you want):
Piotr+1
Biao+1
Timo   -1
Yu   -1
Aljoscha -1
Till  +1
Igal+1
Dawid-1
Me +1

So far: +5 / -4 (Optional arguments in private methods)

Best,
Andrey


On Tue, Aug 6, 2019 at 8:53 AM Piotr Nowojski 

wrote:

Hi Qi,


For example, SingleInputGate is already creating Optional for every

BufferOrEvent in getNextBufferOrEvent(). How much performance gain

would we

get if it’s replaced by null check?

When I was introducing it there, I have micro-benchmarked this

change,

and

there was no visible throughput change in a pure net

Re: [VOTE] FLIP-52: Remove legacy Program interface.

2019-08-21 Thread Timo Walther

+1

Am 21.08.19 um 13:21 schrieb Stephan Ewen:

+1

On Wed, Aug 21, 2019 at 1:07 PM Kostas Kloudas  wrote:


Hi all,

Following the FLIP process, this is a voting thread dedicated to the
FLIP-52.
As shown from the corresponding discussion thread [1], we seem to agree
that
the Program interface can be removed, so let's make it also official
with a vote.

Cheers,
Kostas


[1]
https://lists.apache.org/thread.html/0dbd0a4adf9ad00d6ad869dffc8820f6ce4c1969e1ea4aafb1dd0aa4@%3Cdev.flink.apache.org%3E





[DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-21 Thread Timo Walther

Hi everyone,

some of you might remember the discussion I started end of March [1] 
about introducing a new Java DSL for Table API that is not embedded in a 
string.


In particular, it solves the following issues:

- No possibility of deprecating functions

- Missing documentation for users

- Missing auto-completion for users

- Need to port the ExpressionParser from Scala to Java

- Scala symbols are deprecated! A Java DSL can also enable the Scala DSL 
one.


Due to shift of priorities, we could not work on it in Flink 1.9 but the 
feedback at that time was positive and we should aim for 1.10 to 
simplify the API with this change.


We propose the following FLIP-55:

https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing 



Thanks for any feedback,

Timo

[1] 
https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E




Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-22 Thread Timo Walther

Hi everyone,

thanks for all the feedback we have received online and offline. It 
showed that many people support the idea of evolving the Flink 
configuration functionality. I'm almost sure that this FLIP will not 
solve all issues but at least will improve the current status.


We've updated the document and replaced the Correlation part with the 
concept of a ConfigOptionGroup that can provide all available options of 
a group plus custom group validators for eager validation. For now, this 
eager group validation will only be used at certain locations in the 
Flink code but it prepares for maybe validating the entire global 
configuration before submitting a job in the future.


Please take another look if you find time. I hope we can proceed with 
the voting process if there are no objections.


Regards,
Timo

Am 19.08.19 um 12:54 schrieb Timo Walther:

Hi Stephan,

thanks for your suggestions. Let me give you some background about the 
decisions made in this FLIP:


1. Goal: The FLIP is labelled "evolve" not "rework" because we did not 
want to change the entire configuration infrastructure. Both for 
backwards-compatibility reasons and the amount of work that would be 
required to update all options. If our goal is to rework the 
configuration option entirely, I might suggest to switch to JSON 
format with JSON schema and JSON validator. However, setting 
properties in a CLI or web interface becomes more tricky the more 
nested structures are allowed.


2. Class-based Options: The current ConfigOption class is centered 
around Java classes where T is determined by the default value. The 
FLIP just makes this more explicit by offering an explicit `intType()` 
method etc. The current design of validators centered around Java 
classes makes it possible to have typical domain validators baked by 
generics as you suggested. If we introduce types such as "quantity 
with measure and unit" we still need to get a class out of this option 
at the end, so why changing a proven concept?


3. List Options: The `isList` prevents having arbitrary nesting. As 
Dawid mentioned, we kept human readability in mind. For every atomic 
option like "key=12" can be represented by a list "keys=12;13". But we 
don't want to go further; esp. no nesting. A dedicated list option 
would start making this more complicated such as 
"ListOption(ObjectOption(ListOption(IntOption, ...), 
StringOption(...)))", do we want that?


4. Correlation: The correlation part is one of the suggestions that I 
like least in the document. We can also discuss removing it entirely, 
but I think it solves the use case of relating options with each other 
in a flexible way right next to the actual option. Instead of being 
hidden in some component initialization, we should put it close to the 
option to also perform validation eagerly instead of failing at 
runtime when the option is accessed the first time.


Regards,
Timo


Am 18.08.19 um 23:32 schrieb Stephan Ewen:

A "List Type" sounds like a good direction to me.

The comment on the type system was a bit brief, I agree. The idea is 
to see

if something like that can ease validation. Especially the correlation
system seems quite complex (proxies to work around order of 
initialization).


For example, let's assume we don't think primarily about "java types" 
but
would define types as one of the following (just examples, haven't 
thought

all the details through):

   (a) category type: implies string, and a fix set of possible values.
Those would be passes and naturally make it into the docs and 
validation.

Maps to a String or Enum in Java.

   (b) numeric integer type: implies long (or optionally integer, if 
we want

to automatically check overflow / underflow). would take typical domain
validators, like non-negative, etc.

   (c) numeric real type: same as above (double or float)

   (d) numeric interval type: either defined as an interval, or 
references

other parameter by key. validation by valid interval.

   (e) quantity: a measure and a unit. separately parsable. The 
measure's
type could be any of the numeric types above, with same validation 
rules.


With a system like the above, would we still correlation validators? Are
there still cases that we need to catch early (config loading) or are 
the
remaining cases sufficiently rare and runtime or setup specific, that 
it is

fine to handle them in component initialization?


On Sun, Aug 18, 2019 at 6:36 PM Dawid Wysakowicz 


wrote:


Hi Stephan,

Thank you for your opinion.

Actually list/composite types are the topics we spent the most of the
time. I understand that from a perspective of a full blown type system,
a field like isList may look weird. Please let me elaborate a bit more
on the reason behind it though. Maybe we weren't clear enough about it
in the FLIP. The key feature of all the conifg options is

Re: [ANNOUNCE] Apache Flink 1.9.0 released

2019-08-22 Thread Timo Walther

Thanks to everyone who contributed to this release. Great team work!

Regards,
Timo

Am 22.08.19 um 14:16 schrieb JingsongLee:

Congratulations~~~ Thanks gordon and everyone~

Best,
Jingsong Lee


--
From:Oytun Tez 
Send Time:2019年8月22日(星期四) 14:06
To:Tzu-Li (Gordon) Tai 
Cc:dev ; user ; announce 

Subject:Re: [ANNOUNCE] Apache Flink 1.9.0 released

Congratulations team; thanks for the update, Gordon.

---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com

On Thu, Aug 22, 2019 at 8:03 AM Tzu-Li (Gordon) Tai  wrote:

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.9.0, which is the latest major release.

Apache Flink(r) is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2019/08/22/release-1.9.0.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Gordon





[VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-27 Thread Timo Walther

Hi everyone,

thanks for the great feedback we have received for the draft of FLIP-54. 
The discussion seems to have reached an agreement. Of course this 
doesn't mean that we can't propose further improvements on 
ConfigOption's and Flink configuration in general in the future. It is 
just one step towards having a better unified configuration for the project.


Please vote for the following design document:

https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#

I will convert it to a Wiki page afterwards.

Thanks,
Timo



Re: [CODE-STYLE] Builder pattern

2019-08-27 Thread Timo Walther

Hi all,

great to put this code style discussion on the mailing list because I 
also have found this style inconsistent in the past.


Regarding Gyula's suggestions:
1. a static method `builder()` I think IDEs are also hightlight methods 
with this name
2. I would vote for a more declarative `propertyA(...).propertyB(...)` 
approach instead of setters because most methods should be setters. 
However, if implementers want to add methods such as `addField(..)`, 
`useProcessingTime()` this sounds also fine to me.

3. mutable
Regarding Dawid's suggestions:
4. regarding required and optional parameters, I would allow both options
5. always end with `build()`

Thanks,
Timo

On 27.08.19 10:04, Kostas Kloudas wrote:

Hi all,

I agree with Arvid, although for point 2 I would be less strict.

@Piotr, for the side note you mentioned, and from the description you
mention in the mail for example I,
it seems that the need to pass parameters in the build() is not an
inherent need of the build pattern but it
can be mitigated by just creating sth like a StreamOperatorConfig (and
not the operator builder itself) on the
client, serialize it, and then at the TM, use the actual
StreamOperator builder with that config to create the
operator. There you can have all the needed parameters and also
perform the validation that
Dawid mention.

Again, this is just from the summary you provided, not from looking at
the code, so I may be missing something.

On the validation, I think that it should happen at the build(), as
this is the only place where
we know all the parameters.

Cheers,
Kostas

On Tue, Aug 27, 2019 at 9:47 AM Arvid Heise  wrote:

Hi all,

I'd like to differentiate between API level builder usage and "internal"
builder usage (for example, test harness).

For API level builder, in general everything goes, as long as it aligns
with user expectations. API level usages are also much more discussed, such
that I'd expect them to be consistent within one API. This freedom is
especially required when considering APIs for non-java languages.

Now for "internal" usages (which may or may not align with Java Datastream
etc. usage). I'd like to get a style that is well supported by the
primarily used IDEs. I don't want to write a new builder from scratch and
by the looks of it, we will get many more builders.

Furthermore, I'd like to emphasize that the primary use case for using
builders for me is to mitigate the lack of named arguments in Java, which
is especially painful for immutable types. In an ideal world, I'd like to
have all classes immutable and use builders to create new instances if we
have
a) too many parameters to be passed (which should become many more once we
commit even more to DI),
b) we have meaningful default values, such that we can omit a significant
amount parameters by using a buidler, or
c) we have a good amount of optional (=nullable) parameters.
Obviously, we deviate from that whenever performance considerations demand
it.

With that my votes for the questions:
1. static method only, hide builder ctor
2. Intellij and avro use setX() for property X, so I'd go with that. Lombok
just uses X(), so I wouldn't mind it.
3. Mutable approach. Immutable doesn't make much sense to me. Then I can
directly go with a Wither pattern on the immutable class without builder.
4. Private ctor. If it has a builder, it should be used. Exception:
migration support for some intermediate versions (if we added a builder to
a class, keep the ctor deprecated public for 1,2 versions).
5. no setX in general, we want to have an immutable class. Exceptions where
due (should be mostly for performance reasons).

Best,

Arvid

On Mon, Aug 26, 2019 at 4:40 PM Piotr Nowojski  wrote:


Hi,

I agree with Dawid, modulo that I don’t have any preference about point 2
- I’m ok even with not enforcing this.

One side note about point 4. There are use cases where passing obligatory
parameters in the build method itself might make sense:

I. - when those parameters can not be or can not be easily passed via the
constructor. Good example of that is “builder” pattern for the
StreamOperators (StreamOperatorFactory), where factory is constructed on
the API level in the client, then it’s being serialised and sent over the
network and reconstructed on the TaskManager, where StreamOperator is
finally constructed. The issue is that some of the obligatory parameters
are only available on the TaskManager, so they can not be passed on a
DataStream level in the client.
II. - when builder might be used to create multiple instances of the
object with different values.

Piotrek


On 26 Aug 2019, at 15:12, Jark Wu  wrote:

Hi Gyula,

Thanks for bringing this. I think it would be nice if we have a common
approach to create builder pattern.
Currently, we have a lot of builders but with different tastes.


1. Creating the builder objects:

I prefer option a) too. It would be easier for users to get the builder
instance.


2. Setting properties on the builder:

I don'

Re: [VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-27 Thread Timo Walther

Hi Dawid,

I'm a big fan of immutability and your suggestion makes sense to me. 
Furthermore, I thought about the class name clashes of ConfigOptionGroup 
and the existing ConfigGroup annotation. Maybe we should come up with a 
better name.


I will make some additional adjustments to the FLIP. I hereby cancel the 
vote. Further discussions should go into the [DISCUSS] thread until a 
new vote is started.


Thanks,
Timo


On 27.08.19 16:12, Dawid Wysakowicz wrote:


Actually I wanted to propose a slight change to the proposal. 
Therefore I want to change my vote to -1 for now.


I suggest to change the Configurable interface to ConfigurableFactory:

public interface ConfigurableFactory {


/**

* Creates an instance from the given configuration.

*/

    TfromConfiguration(ConfigurationReader configuration);


/**

* Writes this instance to the given configuration.

*/

void toConfiguration(T value, ConfigurationWriter configuration);

}

And the corresponding method in the builder to:

 TypedConfigOptionBuilder configurableType(ClassConfigurableFactory> clazz) {


return new TypedConfigOptionBuilder<>(key, clazz);

}
This way we can keep the "configurable" objects immutable.

Best,

Dawid

On 27/08/2019 13:28, Timo Walther wrote:

Hi everyone,

thanks for the great feedback we have received for the draft of 
FLIP-54. The discussion seems to have reached an agreement. Of course 
this doesn't mean that we can't propose further improvements on 
ConfigOption's and Flink configuration in general in the future. It 
is just one step towards having a better unified configuration for 
the project.


Please vote for the following design document:

https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit# 



I will convert it to a Wiki page afterwards.

Thanks,
Timo






Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-27 Thread Timo Walther

Hi everyone,

I updated the FLIP proposal one more time as mentioned in the voting 
thread. If there are no objections, I will start a new voting thread 
tomorrow at 9am Berlin time.


Thanks,
Timo


On 22.08.19 14:19, Timo Walther wrote:

Hi everyone,

thanks for all the feedback we have received online and offline. It 
showed that many people support the idea of evolving the Flink 
configuration functionality. I'm almost sure that this FLIP will not 
solve all issues but at least will improve the current status.


We've updated the document and replaced the Correlation part with the 
concept of a ConfigOptionGroup that can provide all available options 
of a group plus custom group validators for eager validation. For now, 
this eager group validation will only be used at certain locations in 
the Flink code but it prepares for maybe validating the entire global 
configuration before submitting a job in the future.


Please take another look if you find time. I hope we can proceed with 
the voting process if there are no objections.


Regards,
Timo

Am 19.08.19 um 12:54 schrieb Timo Walther:

Hi Stephan,

thanks for your suggestions. Let me give you some background about 
the decisions made in this FLIP:


1. Goal: The FLIP is labelled "evolve" not "rework" because we did 
not want to change the entire configuration infrastructure. Both for 
backwards-compatibility reasons and the amount of work that would be 
required to update all options. If our goal is to rework the 
configuration option entirely, I might suggest to switch to JSON 
format with JSON schema and JSON validator. However, setting 
properties in a CLI or web interface becomes more tricky the more 
nested structures are allowed.


2. Class-based Options: The current ConfigOption class is centered 
around Java classes where T is determined by the default value. The 
FLIP just makes this more explicit by offering an explicit 
`intType()` method etc. The current design of validators centered 
around Java classes makes it possible to have typical domain 
validators baked by generics as you suggested. If we introduce types 
such as "quantity with measure and unit" we still need to get a class 
out of this option at the end, so why changing a proven concept?


3. List Options: The `isList` prevents having arbitrary nesting. As 
Dawid mentioned, we kept human readability in mind. For every atomic 
option like "key=12" can be represented by a list "keys=12;13". But 
we don't want to go further; esp. no nesting. A dedicated list option 
would start making this more complicated such as 
"ListOption(ObjectOption(ListOption(IntOption, ...), 
StringOption(...)))", do we want that?


4. Correlation: The correlation part is one of the suggestions that I 
like least in the document. We can also discuss removing it entirely, 
but I think it solves the use case of relating options with each 
other in a flexible way right next to the actual option. Instead of 
being hidden in some component initialization, we should put it close 
to the option to also perform validation eagerly instead of failing 
at runtime when the option is accessed the first time.


Regards,
Timo


Am 18.08.19 um 23:32 schrieb Stephan Ewen:

A "List Type" sounds like a good direction to me.

The comment on the type system was a bit brief, I agree. The idea is 
to see

if something like that can ease validation. Especially the correlation
system seems quite complex (proxies to work around order of 
initialization).


For example, let's assume we don't think primarily about "java 
types" but
would define types as one of the following (just examples, haven't 
thought

all the details through):

   (a) category type: implies string, and a fix set of possible values.
Those would be passes and naturally make it into the docs and 
validation.

Maps to a String or Enum in Java.

   (b) numeric integer type: implies long (or optionally integer, if 
we want

to automatically check overflow / underflow). would take typical domain
validators, like non-negative, etc.

   (c) numeric real type: same as above (double or float)

   (d) numeric interval type: either defined as an interval, or 
references

other parameter by key. validation by valid interval.

   (e) quantity: a measure and a unit. separately parsable. The 
measure's
type could be any of the numeric types above, with same validation 
rules.


With a system like the above, would we still correlation validators? 
Are
there still cases that we need to catch early (config loading) or 
are the
remaining cases sufficiently rare and runtime or setup specific, 
that it is

fine to handle them in component initialization?


On Sun, Aug 18, 2019 at 6:36 PM Dawid Wysakowicz 


wrote:


Hi Stephan,

Thank you for your opinion.

Actually list/composite types are the topics we spent the most of the
time. I understand that from a persp

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-27 Thread Timo Walther

Hi David,

thanks for your feedback. With the current design, the DSL would be free 
of any ambiguity but it is definitely more verbose esp. around defining 
values.


I would be happy about further suggestions that make the DSL more 
readable. I'm also not sure if we go for `$()` and `v()` instead of more 
readable `ref()` and `val()`. This could maybe make it look less 
"alien", what do you think?


Some people mentioned to overload certain methods for accepting values 
or column names. E.g. `$("field").isEqual("str")` but then string values 
could be confused with column names.


Thanks,
Timo

On 27.08.19 17:34, David Anderson wrote:

In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the current
Java DSL. In a training context it will be easy to teach, but I wonder
if we can find a way to make it look less alien at first glance.

David

On Wed, Aug 21, 2019 at 1:33 PM Timo Walther  wrote:

Hi everyone,

some of you might remember the discussion I started end of March [1]
about introducing a new Java DSL for Table API that is not embedded in a
string.

In particular, it solves the following issues:

- No possibility of deprecating functions

- Missing documentation for users

- Missing auto-completion for users

- Need to port the ExpressionParser from Scala to Java

- Scala symbols are deprecated! A Java DSL can also enable the Scala DSL
one.

Due to shift of priorities, we could not work on it in Flink 1.9 but the
feedback at that time was positive and we should aim for 1.10 to
simplify the API with this change.

We propose the following FLIP-55:

https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing
<https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit#heading=h.jn04bfolpim0>

Thanks for any feedback,

Timo

[1]
https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E





[VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-28 Thread Timo Walther

Hi everyone,

after some last minute changes yesterday, I would like to start a new 
vote on FLIP-54. The discussion seems to have reached an agreement. Of 
course this doesn't mean that we can't propose further improvements on 
ConfigOption's and Flink configuration in general in the future. It is 
just one step towards having a better unified configuration for the project.


Please vote for the following design document:

https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#

The discussion can be found at:

https://lists.apache.org/thread.html/a56c6b52e5f828d4a737602b031e36b5dd6eaa97557306696a8063a9@%3Cdev.flink.apache.org%3E

This voting will be open for at least 72 hours. I'll try to close it on 
2019-09-02 8:00 UTC, unless there is an objection or not enough votes.


I will convert it to a Wiki page afterwards.

Thanks,

Timo



Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-28 Thread Timo Walther

Hi David,

thanks for your feedback. I was also skeptical about 1 char method 
names, I restored the `val()` method for now. If you read literature 
such as Wikipedia [1]: "literal is a notation for representing a fixed 
value in source code. Almost all programming languages have notations 
for atomic values". So they are also talking about "values".


Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced 
that this is better.


Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)

On 27.08.19 22:10, David Anderson wrote:

TImo,

While it's not exactly pretty, I don't mind the $("field") construct.
It's not particularly surprising. The v() method troubles me more; it
looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David

On Tue, Aug 27, 2019 at 5:45 PM Timo Walther  wrote:

Hi David,

thanks for your feedback. With the current design, the DSL would be free
of any ambiguity but it is definitely more verbose esp. around defining
values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of more
readable `ref()` and `val()`. This could maybe make it look less
"alien", what do you think?

Some people mentioned to overload certain methods for accepting values
or column names. E.g. `$("field").isEqual("str")` but then string values
could be confused with column names.

Thanks,
Timo

On 27.08.19 17:34, David Anderson wrote:

In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the current
Java DSL. In a training context it will be easy to teach, but I wonder
if we can find a way to make it look less alien at first glance.

David

On Wed, Aug 21, 2019 at 1:33 PM Timo Walther  wrote:

Hi everyone,

some of you might remember the discussion I started end of March [1]
about introducing a new Java DSL for Table API that is not embedded in a
string.

In particular, it solves the following issues:

- No possibility of deprecating functions

- Missing documentation for users

- Missing auto-completion for users

- Need to port the ExpressionParser from Scala to Java

- Scala symbols are deprecated! A Java DSL can also enable the Scala DSL
one.

Due to shift of priorities, we could not work on it in Flink 1.9 but the
feedback at that time was positive and we should aim for 1.10 to
simplify the API with this change.

We propose the following FLIP-55:

https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing
<https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit#heading=h.jn04bfolpim0>

Thanks for any feedback,

Timo

[1]
https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E





Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Timo Walther

Hi Becket,

let me try to clarify some of your questions:

1. For every option, we also needed to think about how to represent it 
in a human readable format. We do not want to allow arbitrary nesting 
because that would easily allow to bypass the flattened hierarchy of 
config options (`session.memory.min`). The current design allows to 
represent every option type as a list. E.g.:


`myIntOption: 12` can be `myIntListOption: 12;12`
`myObjectOption: field=12,other=true` can be `myObjectListOption: 
field=12,other=true; field=12,other=true`
`myPropertyOption: key=str0,other=str1` can be `myPropertyListOption: 
key=str0,other=str1;key=str0,other=str1`


We need the atomic class for serialization/deserialization both in 
binary and string format.


ConfigOption is not present in the code base yet, but this FLIP is 
a preparation of making ExecutionConfig configurable. If you look into 
this class or also in existing table connectors/formats, you will see 
that each proposed option type has its requirements.


2. Regarding extending the description of ConfigOptions, the semantic of 
one option should be a super set of the other option. E.g. in Table API 
we might use general ExecutionConfig properties. But we would like to a) 
make external options more prominent in the Table API config docs to 
link people to properties they should pay attention b) notice about side 
effects. The core semantic of a property should not change.


3. The factory will not receive the entire configuration but works in a 
separate key space. For `myObjectOption` above, it would receive a 
configuration that consists of `field: 12` and `other: true`.


I agree. I will convert the document into a Wiki page today.

Thanks,
Timo

On 29.08.19 09:00, Stephan Ewen wrote:

@Becket One thing that may be non-obvious is that the Configuration class
also defines serialization / persistence logic at the moment. So it needs
to know the set of types it supports. That stands in the way of an
arbitrary generic map type.

@Timo I agree though that it seems a bit inconsistent to have one
collection orthogonal to the type (List) and another one bound to specific
types (Map).

On Thu, Aug 29, 2019 at 8:20 AM Becket Qin  wrote:


Hi Timo,

Thanks for the proposal. Sorry for the late comments, but I have a few
questions / comments.

1. Is a new field of isList necessary in the ConfigOption?
Would it be enough to just check the atomicClass to see if it is a List or
not?
Also, in the ConfigOption class case, are we always assume both key
and value types are String? Can we just apply the same to the
ConfigOption?
BTW, I did a quick search in the codebase but did not find any usage of
ConfigOption.

2. The same config name, but with two ConfigOption with different semantic
in different component seems super confusing. For example, when users set
both configs, they may have no idea one is overriding the other. There
might be two cases:
  - If it is just the same config used by different components to act
accordingly, it might be better to just have one config, but describe
clearly on how that config will be used.
  - If it is actually two configurations that can be set differently, I
think the config names should just be different.

3. Regarding the ConfigurableFactory, is the toConfiguration() method
pretty much means getConfiguration()? The toConfiguration() method sounds
like converting an object to a configuration, which only works if the
object does not contain any state / value. I am also wondering if there is
a real use case of this method. Because supposedly the configurations could
just be passed around to caller of this method.

Also, can you put the proposal into the FLIP wiki instead of in the Google
doc before voting? The FLIP wiki allows track the modification history and
has a more established structure to ensure nothing is missed.

Thanks,

Jiangjie (Becket) Qin

On Tue, Aug 27, 2019 at 11:34 PM Timo Walther  wrote:


Hi everyone,

I updated the FLIP proposal one more time as mentioned in the voting
thread. If there are no objections, I will start a new voting thread
tomorrow at 9am Berlin time.

Thanks,
Timo


On 22.08.19 14:19, Timo Walther wrote:

Hi everyone,

thanks for all the feedback we have received online and offline. It
showed that many people support the idea of evolving the Flink
configuration functionality. I'm almost sure that this FLIP will not
solve all issues but at least will improve the current status.

We've updated the document and replaced the Correlation part with the
concept of a ConfigOptionGroup that can provide all available options
of a group plus custom group validators for eager validation. For now,
this eager group validation will only be used at certain locations in
the Flink code but it prepares for maybe validating the entire global
configuration before submitting a job in the future.

Please take another look if you find time. I hope we can proceed with
the voting process if there are no 

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-08-29 Thread Timo Walther
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()` 
but I think Fabian and Dawid liked single char methods for the most 
commonly used expressions.


Btw, what is your opinion on the names of commonly used methods such as 
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()` 
or even shorter to `eq`, `gt`, `gte`?


Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the code 
base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should use 
something like “lit()”. I also think that for column references we could use 
“col()” to make it clear that it is a column reference. What do you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala. Assuming 
the intention is to make the dsl ergonomic for Scala developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method names, I restored the `val()` 
method for now. If you read literature such as Wikipedia [1]: "literal is a notation for 
representing a fixed value in source code. Almost all programming languages have notations for 
atomic values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced that this 
is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field") construct.
It's not particularly surprising. The v() method troubles me more; it
looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther  wrote:
Hi David,

thanks for your feedback. With the current design, the DSL would be free
of any ambiguity but it is definitely more verbose esp. around defining
values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of more
readable `ref()` and `val()`. This could maybe make it look less
"alien", what do you think?

Some people mentioned to overload certain methods for accepting values
or column names. E.g. `$("field").isEqual("str")` but then string values
could be confused with column names.

Thanks,
Timo


On 27.08.19 17:34, David Anderson wrote:
In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the current
Java DSL. In a training context it will be easy to teach, but I wonder
if we can find a way to make it look less alien at first glance.

David


On Wed, Aug 21, 2019 at 1:33 PM Timo Walther  wrote:
Hi everyone,

some of you might remember the discussion I started end of March [1]
about introducing a new Java DSL for Table API that is not embedded in a
string.

In particular, it solves the following issues:

- No possibility of deprecating functions

- Missing documentation for users

- Missing auto-completion for users

- Need to port the ExpressionParser from Scala to Java

- Scala symbols are deprecated! A Java DSL can also enable the Scala DSL
one.

Due to shift of priorities, we could not work on it in Flink 1.9 but the
feedback at that time was positive and we should aim for 1.10 to
simplify the API with this change.

We propose the following FLIP-55:

https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit?usp=sharing
<https://docs.google.com/document/d/1CfaaD3j8APJDKwzIT4YsX7QD2huKTB4xlA3vnMUFJmA/edit#heading=h.jn04bfolpim0>

Thanks for any feedback,

Timo

[1]
https://lists.apache.org/thread.html/e6f31d7fa53890b91be0991c2da64556a91ef0fc9ab3ffa889dacc23@%3Cdev.flink.apache.org%3E





Re: [VOTE] FLIP-54: Evolve ConfigOption and Configuration

2019-08-29 Thread Timo Walther

I converted the mentioned Google doc into a wiki page:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-54%3A+Evolve+ConfigOption+and+Configuration

The core semantics have not changed.

Happy voting,
Timo

On 29.08.19 04:30, Zili Chen wrote:

The design looks good to me.

+1 go ahead!

Best,
tison.


Jark Wu  于2019年8月28日周三 下午6:08写道:


Hi Timo,

The new changes looks good to me.

+1 to the FLIP.


Cheers,
Jark

On Wed, 28 Aug 2019 at 16:02, Timo Walther  wrote:


Hi everyone,

after some last minute changes yesterday, I would like to start a new
vote on FLIP-54. The discussion seems to have reached an agreement. Of
course this doesn't mean that we can't propose further improvements on
ConfigOption's and Flink configuration in general in the future. It is
just one step towards having a better unified configuration for the
project.

Please vote for the following design document:




https://docs.google.com/document/d/1IQ7nwXqmhCy900t2vQLEL3N2HIdMg-JO8vTzo1BtyKU/edit#

The discussion can be found at:




https://lists.apache.org/thread.html/a56c6b52e5f828d4a737602b031e36b5dd6eaa97557306696a8063a9@%3Cdev.flink.apache.org%3E

This voting will be open for at least 72 hours. I'll try to close it on
2019-09-02 8:00 UTC, unless there is an objection or not enough votes.

I will convert it to a Wiki page afterwards.

Thanks,

Timo






Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Timo Walther

Hi Becket,

thanks for the discussion.

1. ConfigOptions in their current design are bound to classes. 
Regarding, the option is "creating some Configurable objects instead of 
defining the config to create
those Configurable"? We just moved this logic to a factory, this factory 
can then also be used for other purposes. However, how the option and 
objects are serialized to Configuration is still not part of the option. 
The option is just pure declaration.


If we would allow only List, implementers would need to start 
implementing own parsing and validation logic all the time. We would 
like to avoid that.


Please also keep in mind that Configuration must not consist of only 
strings, it manages a Map for efficient access. Every 
map entry can have a string representation for persistence, but in most 
cases consists of unserialized objects.


2. MAX_PARALLELISM is still defined just once. We don't overwrite keys, 
types or default values. But different layers might want to add helpful 
information. In our concrete use case for FLIP-59, ExecutionConfig has 
50 properties and many of them are not relevant for the Table layer or 
have no effect at all. We would like to list and mention the most 
important config options again in the Table Configuration section, so 
that users are not confused, but with a strong link to the core option. 
E.g.: registered kryo serializers are also important also for Table 
users, we would like to add the comment "This option allows to modify 
the serialization of the ANY SQL data type.". I think we should not spam 
the core configuration page with comments from all layers, connectors, 
or libraries but keep this in the corresponding component documentation.


3. But it is something like fromBytes() and toBytes()? It serializes and 
deserializes T from a configuration?


Regards,
Timo

On 29.08.19 19:14, Becket Qin wrote:

Hi Timo and Stephan,

Thanks for the detail explanation.

1. I agree that each config should be in a human readable format. My
concern is that the current List looks going a little too far
from what the configuration is supposed to do. They are essentially
creating some Configurable objects instead of defining the config to create
those Configurable. This mixes ConfigOption and the usage of it. API wise
it would be good to keep the configs and their usages (such as how to
create objects using the ConfigOption) apart from each other.
I am wondering if we can just make List also only take string. For example,
is the following definition of map and list sufficient?

A MapConfigOption is ConfigOption>. It can be defined
as:
map_config_name: k1=v1, k2=v2, k3=v3, ...

A ListConfigOption is ConfigOption>. It can be defined as:
list_config_name: v1, v2, v3, ...

A ListOfMapConfigOption is ConfigOption>. It can
be defined as:
list_of_map_config_name: k1=v1, k2=v2; k3=v3, k4=v4;

All the key and values in the configuration are String. This also
guarantees that the configuration is always serializable.
If we want to do one more step, we can allow the ConfigOption to set all
the primitive types and parse that for the users. So something like
List, List> seems fine.

The configuration class could also have util methods to create a list of
configurable such as:
 List  clazz).
But the configuration class will not take arbitrary Configurable as the
value of its config.

2. I might have misunderstood this. But my concern on the description
extension is in the following example.

public static final ConfigOption MAX_PARALLELISM =

CoreOptions.MAX_PARALLELISM.withExtendedDescription(
"Note: That this property means that a table program has a side-effect
XYZ.");

In this case, we will have two MAX_PARALLELISM configs now. One is
CoreOptions.MAX_PARALLELISM. The other one is defined here. I suppose users
will see both configurations. One with an extended description and one
without. Let's say there is a third component which also users
MAX_PARALLELISM, will there be yet another MAX_PARALLELISM ConfigOption? If
so, what would that ConfigOption's description look like?

Ideally, we would want to have just one CoreOptions.MAX_PARALLELISM and the
description should clearly state all the usage of this ConfigOption.

3. I see, in that case, how about we name it something like
extractConfiguration()? I am just trying to see if we can make it clear
this is not something like fromBytes() and toBytes().

Thanks,

Jiangjie (Becket) Qin

On Thu, Aug 29, 2019 at 6:09 PM Timo Walther  wrote:


Hi Becket,

let me try to clarify some of your questions:

1. For every option, we also needed to think about how to represent it
in a human readable format. We do not want to allow arbitrary nesting
because that would easily allow to bypass the flattened hierarchy of
config options (`session.memory.min`). The current design allows to
represent every option type as a list. E.g.:

`myIntOption: 12` can be `myIntListOption: 12;12`
`myObje

[DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-08-30 Thread Timo Walther

Hi everyone,

the Table API & SQL documentation was already in a very good shape in 
Flink 1.8. However, in the past it was mostly presented as an addition 
to DataStream API. As the Table and SQL world is growing quickly, 
stabilizes in its concepts, and is considered as another top-level API 
and closed ecosystem, it is time to restructure the docs a little bit to 
represent the vision of FLIP-32.


Current state:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/

We would like to propose the following FLIP-60 for a new structure:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685

Looking forward to feedback.

Thanks,

Timo




Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-08-30 Thread Timo Walther
attered everywhere in the code base as it is now.
The string representation in our proposal is exactly the same as you
explained for those three options. The only difference is that you don't
have to parse the elements of a List, Map etc. afterwards.

Ad. 2

I think about the withExtendedDescription as a helper getter in a
different place, so that the option is easier to find in a different
module from it was defined.

The MAX_PARALLELISM option in TableOptions would conceptually be equal to:

public ConfigOption getMaxParallelismOption() {

 return CoreOptions.MAX_PARALLELISM;

}

This allows to further clarify the description of the option in the
context of a different module and end up in a seperate page in
documentation (but with a link to the original one). In the end it is
exactly the same option. It has exactly same key, type, parsing logic,
it is in the end forwarded to the same place.

Ad. 3

Not sure if I understand your concerns here. As Timo said it is in the
end sth similar to toBytes/fromBytes, but it puts itself to a
Configuration. Also just wanted to make sure we adjusted this part
slightly and now the ConfigOption takes ConfigurableFactory.

Best,

Dawid


On 30/08/2019 09:39, Timo Walther wrote:

Hi Becket,

thanks for the discussion.

1. ConfigOptions in their current design are bound to classes.
Regarding, the option is "creating some Configurable objects instead
of defining the config to create
those Configurable"? We just moved this logic to a factory, this
factory can then also be used for other purposes. However, how the
option and objects are serialized to Configuration is still not part
of the option. The option is just pure declaration.

If we would allow only List, implementers would need to start
implementing own parsing and validation logic all the time. We would
like to avoid that.

Please also keep in mind that Configuration must not consist of only
strings, it manages a Map for efficient access. Every
map entry can have a string representation for persistence, but in
most cases consists of unserialized objects.

2. MAX_PARALLELISM is still defined just once. We don't overwrite
keys, types or default values. But different layers might want to add
helpful information. In our concrete use case for FLIP-59,
ExecutionConfig has 50 properties and many of them are not relevant
for the Table layer or have no effect at all. We would like to list
and mention the most important config options again in the Table
Configuration section, so that users are not confused, but with a
strong link to the core option. E.g.: registered kryo serializers are
also important also for Table users, we would like to add the comment
"This option allows to modify the serialization of the ANY SQL data
type.". I think we should not spam the core configuration page with
comments from all layers, connectors, or libraries but keep this in
the corresponding component documentation.

3. But it is something like fromBytes() and toBytes()? It serializes
and deserializes T from a configuration?

Regards,
Timo

On 29.08.19 19:14, Becket Qin wrote:

Hi Timo and Stephan,

Thanks for the detail explanation.

1. I agree that each config should be in a human readable format. My
concern is that the current List looks going a little
too far
from what the configuration is supposed to do. They are essentially
creating some Configurable objects instead of defining the config to
create
those Configurable. This mixes ConfigOption and the usage of it. API
wise
it would be good to keep the configs and their usages (such as how to
create objects using the ConfigOption) apart from each other.
I am wondering if we can just make List also only take string. For
example,
is the following definition of map and list sufficient?

A MapConfigOption is ConfigOption>. It can be
defined
as:
map_config_name: k1=v1, k2=v2, k3=v3, ...

A ListConfigOption is ConfigOption>. It can be defined as:
list_config_name: v1, v2, v3, ...

A ListOfMapConfigOption is ConfigOption>. It
can
be defined as:
list_of_map_config_name: k1=v1, k2=v2; k3=v3, k4=v4;

All the key and values in the configuration are String. This also
guarantees that the configuration is always serializable.
If we want to do one more step, we can allow the ConfigOption to set all
the primitive types and parse that for the users. So something like
List, List> seems fine.

The configuration class could also have util methods to create a list of
configurable such as:
 List  clazz).
But the configuration class will not take arbitrary Configurable as the
value of its config.

2. I might have misunderstood this. But my concern on the description
extension is in the following example.

public static final ConfigOption MAX_PARALLELISM =

CoreOptions.MAX_PARALLELISM.withExtendedDescription(
"Note: That this property means that a table program has a side-effect
XYZ.");

In this case, we will have two MAX_PARALLELISM configs now. One is
Cor

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Timo Walther

Hi Becket,

Re 1 & 3: "values in configurations should actually be immutable"

I would also prefer immutability but most of our configuration is 
mutable due to serialization/deserialization. Also maps and list could 
be mutable in theory. It is difficult to really enforce that for nested 
structures. I see two options:


a) For the original design: How about we force implementers to add a 
duplicate() method in a Configurable object? This would mean that the 
object is still mutable but by duplicating the object both during 
reading and writing we would avoid the problem you described.


b) For the current design: We still use the factory approach but let a 
Configurable object implement a getFactory() method such that we know 
how to serialize the object. With the help of a factory we can also 
duplicate the object easily during reading and writing and ensure 
immutability.


I would personally go for approach a) to not over-engineer this topic. 
But I'm open for option b).


Regards,
Timo


On 31.08.19 04:09, Becket Qin wrote:

Hi Timo,

Thanks for the reply. I am still a little concerned over the mutability of
the Configurable which could be the value in Configuration.

Re: 1


But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

This seems difficult to enforce... Ideally the values in configurations
should actually be immutable. The value can only be changed by explicitly
calling setters in Configuration. Otherwise we may have weird situation
where the Configurable in the same configuration are different in two
places because the configurable is modified in one places and not modified
in another place. So I am a little concerned on putting a Configurable type
in the Configuration map, because the value could be modified without
explicitly setting the configuration. For example, can users do the
following?

Configurable configurable =
configuration.getConfigurable(myConfigurableOption);
configurable.setConfigA(123); // this already changes the configurable
object in the configuration.

Re: 2
Thanks for confirming. As long as users will not have a situation where
they need to set two configurations with the same key but different
descriptions, I think it is OK.

Re: 3
This is actually kind of related to 1. i.e. Whether toConfiguration()
guarantees the exact same object can be rebuilt from the configuration or
not. I am still not quite sure about the use case of toConfiguration()
though. It seems indicating the Configurable is mutable, which might be
dangerous.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 10:04 PM Timo Walther  wrote:


Hi Becket,

1. First of all, you are totally right. The FLIP contains a bug due to
the last minute changes that Dawid suggested: by having immutable
objects created by a factory we loose the serializability of the
Configuration because the factory itself is not stored in the
Configuration. I would propose to revert the last change and stick to
the original design, which means that a object must implement the
Configurable interface and also implements serialization/deserialization
methods such that also internal fields can be persisted as you
suggested. But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

It is Map because Configuration stores the raw objects.
If you put a Boolean option into it, it remains Boolean. This makes the
map very efficient for shipping to the cluster and accessing options
multiple times. The same for configurable objects. We put the pure
objects into the map without any serialization/deserialization. The
provided factory allows to convert the Object into a Configuration and
we know how to serialize/deserializise a configuration because it is
just a key/value map.

2. Yes, this is what we had in mind. It should still be the same
configuration option. We would like to avoid specialized option keys
across components (exec.max-para and table.exec.max-para) if they are
describing basically the same thing. But adding some more description
like "TableOptions.MAX_PARALLELISM with description_1 + description_2"
does not hurt.

3. They should restore the original object given that the
toConfiguration/fromConfiguration methods have been implemented
correctly. I will extend the example to make the logic clearer while
fixing the bug.

Thanks for the healthy discussion,
Timo


On 30.08.19 15:29, Becket Qin wrote:

Hi Timo,

Thanks again for the clarification. Please see a few more questions

below.

Re: 1


Please also keep in mind that Configuration must not consist of only
strings, it manages a Map for efficient access. Every
map entry can have a string representation for persistence, but in most
cases consists of unserialized objects.

I'd like to understand this a bit more. The rea

Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-02 Thread Timo Walther

Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to 
the user-facing parts again. Some feedback:


1. DataViews: With the current non-annotation design for DataViews, we 
cannot perform eager state declaration, right? At which point during 
execution do we know which state is required by the function? We need to 
instantiate the function first, right?


2. Serializability of functions: How do we ensure serializability of 
functions for catalog persistence? In the Scala/Java API, we would like 
to register classes instead of instances soon. This is the only way to 
store a function properly in a catalog or we need some 
serialization/deserialization logic in the function interfaces to 
convert an instance to string properties.


3. TableEnvironment: What is the signature of `register_function(self, 
name, function)`? Does it accept both a class and function? Like `class 
Sum` and `def split()`? Could you add some examples for registering both 
kinds of functions?


4. FunctionDefinition: Function definition is not a user-defined 
function definition. It is the highest interface for both user-defined 
and built-in functions. I'm not sure if getLanguage() should be part of 
this interface or one-level down which would be `UserDefinedFunction`. 
Built-in functions will never be implemented in a different language. In 
any case, I would vote for removing the UNKNOWN language, because it 
does not solve anything. Why should a user declare a function that the 
runtime can not handle? I also find the term `JAVA` confusing for Scala 
users. How about `FunctionLanguage.JVM` instead?


5. Function characteristics: In the current design, function classes do 
not extend from any upper class. How can users declare characteristics 
that are present in `FunctionDefinition` like determinism, requirements, 
or soon also monotonism.


Thanks,
Timo


On 02.09.19 03:38, Shaoxuan Wang wrote:

Hi Jincheng, Fudian, and Aljoscha,
I am assuming the proposed python UDX can also be applied to Flink SQL.
Is this correct? If yes, I would suggest to title the FLIP as "Flink Python
User-Defined Function" or "Flink Python User-Defined Function for Table".

Regards,
Shaoxuan


On Wed, Aug 28, 2019 at 12:22 PM jincheng sun 
wrote:


Thanks for the feedback Bowen!

Great thanks for create the FLIP and bring up the VOTE Dian!

Best, Jincheng

Dian Fu  于2019年8月28日周三 上午11:32写道:


Hi all,

I have started a voting thread [1]. Thanks a lot for your help during
creating the FLIP @Jincheng.


Hi Bowen,

Very appreciated for your comments. I have replied you in the design doc.
As it seems that the comments doesn't affect the overall design, I'll not
cancel the vote for now and we can continue the discussion in the design
doc.

[1]


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html

<


http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-58-Flink-Python-User-Defined-Function-for-Table-API-td32295.html

Regards,
Dian


在 2019年8月28日,上午11:05,Bowen Li  写道:

Hi Jincheng and Dian,

Sorry for being late to the party. I took a glance at the proposal,

LGTM

in

general, and I left only a couple comments.

Thanks,
Bowen


On Mon, Aug 26, 2019 at 8:05 PM Dian Fu  wrote:


Hi Jincheng,

Thanks! It works.

Thanks,
Dian


在 2019年8月27日,上午10:55,jincheng sun  写道:

Hi Dian, can you check if you have edit access? :)


Dian Fu  于2019年8月26日周一 上午10:52写道:


Hi Jincheng,

Appreciated for the kind tips and offering of help. Definitely need

it!

Could you grant me write permission for confluence? My Id: Dian Fu

Thanks,
Dian


在 2019年8月26日,上午9:53,jincheng sun  写道:

Thanks for your feedback Hequn & Dian.

Dian, I am glad to see that you want help to create the FLIP!
Everyone will have first time, and I am very willing to help you

complete

your first FLIP creation. Here some tips:

- First I'll give your account write permission for confluence.
- Before create the FLIP, please have look at the FLIP Template

[1],

(It's

better to know more about FLIP by reading [2])
- Create Flink Python UDFs related JIRAs after completing the VOTE

of

FLIP.(I think you also can bring up the VOTE thread, if you want! )

Any problems you encounter during this period,feel free to tell me

that

we

can solve them together. :)

Best,
Jincheng




[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP+Template

[2]


https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals


Hequn Cheng  于2019年8月23日周五 上午11:54写道:


+1 for starting the vote.

Thanks Jincheng a lot for the discussion.

Best, Hequn

On Fri, Aug 23, 2019 at 10:06 AM Dian Fu 

wrote:

Hi Jincheng,

+1 to start the FLIP create and VOTE on this feature. I'm willing

to

help

on the FLIP create if you don't mind. As I haven't created a FLIP

before,

it will be great if you could help on this. :)

Regards,
Dian


在 2019年8月22日,下午11:41,jincheng sun 

写道:

Hi all,

Thanks a 

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-09-02 Thread Timo Walther

Hi all,

I see a majority votes for `lit(12)` so let's adopt that in the FLIP. 
The `$("field")` would consider Fabian's concerns so I would vote for 
keeping it like that.


One more question for native English speakers, is it acceptable to have 
`isEqual` instead of `isEqualTo` and `isGreater` instead of `isGreaterThan`?


If there are no more concerns, I will start a voting thread soon.

Thanks,
Timo


On 29.08.19 12:24, Fabian Hueske wrote:

Hi,

IMO, we should define what we would like to optimize for:
1) easy-to-get-started experience or
2) productivity and ease-of-use

While 1) is certainly important, I think we should put more emphasis on
goal 2).
That's why I favor as short as possible names for commonly used methods
like column references and literals/values.
These are used *many* times in *every* query.
Every user who uses the API for more than 30 mins will know what $() or v()
(or whatever method names we come up with) are used for and everybody who
doesn't know can have a look at the JavaDocs or regular documentation.
Shorter method names are not only about increasing the speed to write a
query, but also reducing clutter that needs to be parsed to understand an
expression / query.

I'm OK with descriptive names for other expressions like call(),
isEqualTo() (although these could be the commonly used eq(), gte(), etc.),
and so on but column references (and literals) should be as lightweight as
possible, IMO.

Cheers,
Fabian

Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther 
:
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
but I think Fabian and Dawid liked single char methods for the most
commonly used expressions.

Btw, what is your opinion on the names of commonly used methods such as
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()`
or even shorter to `eq`, `gt`, `gte`?

Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the

code base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should

use something like “lit()”. I also think that for column references we
could use “col()” to make it clear that it is a column reference. What do
you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.

Assuming the intention is to make the dsl ergonomic for Scala developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther  wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method

names, I restored the `val()` method for now. If you read literature such
as Wikipedia [1]: "literal is a notation for representing a fixed value in
source code. Almost all programming languages have notations for atomic
values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced

that this is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field") construct.
It's not particularly surprising. The v() method troubles me more; it
looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 

wrote:

Hi David,

thanks for your feedback. With the current design, the DSL would be

free

of any ambiguity but it is definitely more verbose esp. around

defining

values.

I would be happy about further suggestions that make the DSL more
readable. I'm also not sure if we go for `$()` and `v()` instead of

more

readable `ref()` and `val()`. This could maybe make it look less
"alien", what do you think?

Some people mentioned to overload certain methods for accepting

values

or column names. E.g. `$("field").isEqual("str")` but then string

values

could be confused with column names.

Thanks,
Timo


On 27.08.19 17:34, David Anderson wrote:
In general I'm in favor of anything that is going to make the Table
API easier to learn and more predictable in its behavior. This
proposal kind of falls in the middle. As someone who has spent hours
in the crevices between the various flavors of the current
implementations, I certainly view keeping the various APIs and DSLs
more in sync, and making them less buggy, as highly desirable.

On the other hand, some of the details in the proposal do make the
resulting user code less pretty and less approachable than the

current

Java DSL. In a training context it will be easy to teach, but I

wonder

if we

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-02 Thread Timo Walther
mpler parameters of primitive
types (+duration, memory)

@Becket for the toConfiguration this is required for shipping the
Configuration to TaskManager, so that we do not have to use java
serializability.

Best,

Dawid


On 02/09/2019 10:05, Timo Walther wrote:

Hi Becket,

Re 1 & 3: "values in configurations should actually be immutable"

I would also prefer immutability but most of our configuration is
mutable due to serialization/deserialization. Also maps and list could
be mutable in theory. It is difficult to really enforce that for
nested structures. I see two options:

a) For the original design: How about we force implementers to add a
duplicate() method in a Configurable object? This would mean that the
object is still mutable but by duplicating the object both during
reading and writing we would avoid the problem you described.

b) For the current design: We still use the factory approach but let a
Configurable object implement a getFactory() method such that we know
how to serialize the object. With the help of a factory we can also
duplicate the object easily during reading and writing and ensure
immutability.

I would personally go for approach a) to not over-engineer this topic.
But I'm open for option b).

Regards,
Timo


On 31.08.19 04:09, Becket Qin wrote:

Hi Timo,

Thanks for the reply. I am still a little concerned over the
mutability of
the Configurable which could be the value in Configuration.

Re: 1


But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

This seems difficult to enforce... Ideally the values in configurations
should actually be immutable. The value can only be changed by
explicitly
calling setters in Configuration. Otherwise we may have weird situation
where the Configurable in the same configuration are different in two
places because the configurable is modified in one places and not
modified
in another place. So I am a little concerned on putting a
Configurable type
in the Configuration map, because the value could be modified without
explicitly setting the configuration. For example, can users do the
following?

Configurable configurable =
configuration.getConfigurable(myConfigurableOption);
configurable.setConfigA(123); // this already changes the configurable
object in the configuration.

Re: 2
Thanks for confirming. As long as users will not have a situation where
they need to set two configurations with the same key but different
descriptions, I think it is OK.

Re: 3
This is actually kind of related to 1. i.e. Whether toConfiguration()
guarantees the exact same object can be rebuilt from the
configuration or
not. I am still not quite sure about the use case of toConfiguration()
though. It seems indicating the Configurable is mutable, which might be
dangerous.

Thanks,

Jiangjie (Becket) Qin

On Fri, Aug 30, 2019 at 10:04 PM Timo Walther 
wrote:


Hi Becket,

1. First of all, you are totally right. The FLIP contains a bug due to
the last minute changes that Dawid suggested: by having immutable
objects created by a factory we loose the serializability of the
Configuration because the factory itself is not stored in the
Configuration. I would propose to revert the last change and stick to
the original design, which means that a object must implement the
Configurable interface and also implements
serialization/deserialization
methods such that also internal fields can be persisted as you
suggested. But in general, people should not use any internal fields.
Configurable objects are meant for simple little helper POJOs, not
complex arbitrary nested data structures.

It is Map because Configuration stores the raw objects.
If you put a Boolean option into it, it remains Boolean. This makes the
map very efficient for shipping to the cluster and accessing options
multiple times. The same for configurable objects. We put the pure
objects into the map without any serialization/deserialization. The
provided factory allows to convert the Object into a Configuration and
we know how to serialize/deserializise a configuration because it is
just a key/value map.

2. Yes, this is what we had in mind. It should still be the same
configuration option. We would like to avoid specialized option keys
across components (exec.max-para and table.exec.max-para) if they are
describing basically the same thing. But adding some more description
like "TableOptions.MAX_PARALLELISM with description_1 + description_2"
does not hurt.

3. They should restore the original object given that the
toConfiguration/fromConfiguration methods have been implemented
correctly. I will extend the example to make the logic clearer while
fixing the bug.

Thanks for the healthy discussion,
Timo


On 30.08.19 15:29, Becket Qin wrote:

Hi Timo,

Thanks again for the clarification. Please see a few more questions

below.

Re: 1


Please also keep in mind that Conf

Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Timo Walther
inClass // Or PATH_TO_MyPluginClassFactory
configKey1ForMyPluginClass: 123

Internally, the configuration may discover the MyPluginClassFactory, call
MyPluginClassFactory.create(Configuration) and pass in itself as the
configuration argument.

 From user's perspective, the way to use Configurable is the following:
1. Set a class type of the Plugin in the configuration via Configuration
interface.
2. Provide a factory class for the Plugin, either by config value or by
service provider mechanism.
3. Set the configurations consumed by the plugin, via something like a yaml
file, or programmatically via Configuration interface.

How would the util that you are suggesting look like? It would always

need to serialize/deserialize an object into an immutable string. This
is not very efficient, given that the instance already exists and can be
made immutable by the implementer by not exposing setters. Furthermore,
we would loose the declarative approach and could not generate
documentation. The current approach specifies the static final
sub-ConfigOptions either in Configurable object (initial design) or in
the ConfigurableFactory (current design) such that the docs generator
can read them.

I'd imagine that in most cases, after a concrete Configurable (say
ExecutionConfig) instance is created from the Configuration instance, we
will just pass around the ExecutionConfig instead of the Configuration
object. If so, the serialization to / deserialization from String will only
happen once per JVM, which seems fine. I am not sure why the doc generation
would be impacted. As long as the ConfigOptions go into the Configurable or
ConfigurableFactory, the docs generator can still read them, right?

Regarding "Configurable may be created and configured directly without

reading settings from a Configuration instance", there seems to be a
misunderstanding. This is a very common case if not the most common. As
mentioned before, take ExecutionConfig. This configuration is currently
only used in a programmatic-way and needs a way to be expressed as
ConfigOptions. CachedFile for instance will be a Configurable object
that will binary serialized most of the time when sending it to the
cluster but due to the Configurable design it is possible to store it in
a string representation as well.

Thanks for the explanation. I feel this creating object then serialize /
deserialize using configuration is more of an internal use case. We are
essentially using the configurations to pass some arbitrary string around.
Technically speaking we can use this way to send and receive any object.
But I am not sure if this is something that we want to generalize and
impact more public use cases.
Personally I feel that Configurable
As for CachedFile, it seems we do not plan to use configuration to pass
that on? It would be good to avoid letting the Configurations to host
arbitrary objects beyond the primitive types.

To summarize, I am thinking if we should consider the following:
1. Design the Config mechanism as a cross-board API for not only internal
usage, but for broader use cases.
2. If writeToConfiguration is only for internal use cases, maybe we can
avoid adding it to the configurable interface. We can add another interface
such as ExtractableConfigurable for internal usage.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On Mon, Sep 2, 2019 at 11:59 PM Timo Walther  wrote:


@Becket:

Regarding "great if we don't put this burden on users", we should
consider who is actually using this API. It is not first-level API but
mostly API for Flink contributors. Most of the users will use API
classes ike ExecutionConfig or TableConfig or other builders for
performing configuration. They will never use the ConfigOptions classes
directly. So enforcing a duplicate method does not sound like a burden
to me.

How would the util that you are suggesting look like? It would always
need to serialize/deserialize an object into an immutable string. This
is not very efficient, given that the instance already exists and can be
made immutable by the implementer by not exposing setters. Furthermore,
we would loose the declarative approach and could not generate
documentation. The current approach specifies the static final
sub-ConfigOptions either in Configurable object (initial design) or in
the ConfigurableFactory (current design) such that the docs generator
can read them.

Regarding "Configurable may be created and configured directly without
reading settings from a Configuration instance", there seems to be a
misunderstanding. This is a very common case if not the most common. As
mentioned before, take ExecutionConfig. This configuration is currently
only used in a programmatic-way and needs a way to be expressed as
ConfigOptions. CachedFile for instance will be a Configurable object
that will binary serialized most of the time when sending it to the
cluster but due to the Configurable design it is possible to store it in
a st

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

Hi Bowen,

thanks for your proposal. Here are some thoughts:

1) We should not have the restriction "hive built-in functions can only 
be used when current catalog is hive catalog". Switching a catalog 
should only have implications on the cat.db.object resolution but not 
functions. It would be quite convinient for users to use Hive built-ins 
even if they use a Confluent schema registry or just the in-memory catalog.


2) I would propose to have separate concepts for catalog and built-in 
functions. In particular it would be nice to modularize built-in 
functions. Some built-in functions are very crucial (like AS, CAST, 
MINUS), others are more optional but stable (MD5, CONCAT_WS), and maybe 
we add more experimental functions in the future or function for some 
special application area (Geo functions, ML functions). A data platform 
team might not want to make every built-in function available. Or a 
function module like ML functions is in a different Maven module.


3) Following the suggestion above, we can have a separate discovery 
mechanism for built-in functions. Instead of just going through a static 
list like in BuiltInFunctionDefinitions, a platform team should be able 
to select function modules like 
catalogManager.setFunctionModules(CoreFunctions, GeoFunctions, 
HiveFunctions) or via service discovery;


3) Dawid and I discussed the resulution order again. I agree with Kurt 
that we should unify built-in function (external or internal) under a 
common layer. However, the resolution order should be:

  1. built-in functions
  2. temporary functions
  3. regular catalog resolution logic
Otherwise a temporary function could cause clashes with Flink's built-in 
functions. If you take a look at other vendors, like SQL Server they 
also do not allow to overwrite built-in functions.


Regards,
Timo


On 03.09.19 10:35, JingsongLee wrote:

Thanks Bowen:

+1 for this. And +1 to Kurt's suggestion. My other points are:

1.Hive built-in functions is an intermediate solution. So we should
  not introduce interfaces to influence the framework. To make
  Flink itself more powerful, we should implement the functions
  we need to add.

2.Non-flink built-in functions are easy for users to change their
behavior. If we support some flink built-in functions in the
  future but act differently from non-flink built-in, this will lead to
  changes in user behavior.

3.Fallback to Non-flink built-in functions is a bad choice to
  performance. Without flink internal codegen and data format,
  and bring data format conversion, the performance is not so
  good.

We need to support more complete hive jobs now, we need to
  have this fallback strategy. But it's not worth adding this
  concept at the catalog interface level, and it's not worth
  encouraging other catalogs to do so.

Another question is, does this fallback include all
  hive built-in functions? As far as I know, some hive functions
  have some hacky. If possible, can we start with a white list?
Once we implement some functions to flink built-in, we can
also update the whitelist.

Best,
Jingsong Lee


--
From:Kurt Young 
Send Time:2019年9月3日(星期二) 15:41
To:dev 
Subject:Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

Thanks Bowen for driving this.

+1 for the general idea. It makes the function resolved behavior more
clear and deterministic. Besides, the user can use all hive built-in
functions, which is a great feature.

I only have one comment, but maybe it may touch your design so I think
it would make sense to reply this mail instead of comment on google doc.
Regarding to the classfication of functions, you currently have 4 types
of functions, which are:
1. temporary functions
2. Flink built-in functions
3. Hive built-in functions (or generalized as external built-in functions)
4. catalog functions

What I want to propose is we can merge #3 and #4, make them both under
"catalog" concept, by extending catalog function to make it have ability to
have built-in catalog functions. Some benefits I can see from this approach:
1. We don't have to introduce new concept like external built-in functions.
Actually
I don't see a full story about how to treat a built-in functions, and it
seems a little
bit disrupt with catalog. As a result, you have to make some restriction
like "hive
built-in functions can only be used when current catalog is hive catalog".

2. It makes us easier to adopt another system's built-in functions to
Flink, such as
MySQL. If we don't treat uniformly with  "external built-in functions" and
"external
catalog function", things like user set current catalog to hive but want to
use MySQL's
built-in function will happen.

One more thing, follow this approach, it's clear for your question about
how to support
external built-in functions, which is "add a  getBuiltInFunction to current
Catalog API".

What do you think?

Best,
Kurt


On Fri, Aug 30, 2019 at 7:14 AM Bowen Li  wrote:


Than

Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-03 Thread Timo Walther

Hi Jincheng,

thanks for your response.

2. Serializability of functions: Using some arbitrary serialization 
format for shipping a function to worker sounds fine to me. But once we 
store functions a the catalog we need to think about backwards 
compatibility and evolution of interfaces etc. I'm not sure if 
CloudPickle is the right long-term storage format for this. If we don't 
think about this in advance, we are basically violating our code quality 
guide [1] of never use Java Serialization but in the Python-way. We are 
using the RPC serialization for persistence.


3. TableEnvironment: Can you add some example to the FLIP? Because API 
code like the following is not covered there:


self.t_env.register_function("add_one", udf(lambda i: i + 1, 
DataTypes.BIGINT(),

    DataTypes.BIGINT()))
self.t_env.register_function("subtract_one", udf(SubtractOne(), 
DataTypes.BIGINT(),

DataTypes.BIGINT()))
self.t_env.register_function("add", add)

4. FunctionDefinition: Your response still doesn't answer my question 
entirely. Why do we need FunctionDefinition.getLanguage() if this is a 
"user-defined function" concept and not a "function" concept. In any 
case, all users should not be able to set this method. So it must be 
final in UserDefinedFunction similar to getKind().


5. Function characteristics: If UserDefinedFunction is defined in 
Python, why is it not used in your example in FLIP-58. You could you 
extend the example to show how to specify these attributes in the FLIP?


Regards,
Timo

[1] https://flink.apache.org/contributing/code-style-and-quality-java.html

On 02.09.19 15:35, jincheng sun wrote:

Hi Timo,

Great thanks for your feedback. I would like to share my thoughts with you
inline. :)

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:


Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to
the user-facing parts again. Some feedback:

1. DataViews: With the current non-annotation design for DataViews, we
cannot perform eager state declaration, right? At which point during
execution do we know which state is required by the function? We need to
instantiate the function first, right?


We will analysis the Python AggregateFunction and extract the DataViews

used in the Python AggregateFunction. This can be done
by instantiate a Python AggregateFunction, creating an accumulator by
calling method create_accumulator and then analysis the created
accumulator. This is actually similar to the way that Java
AggregateFunction processing codegen logic. The extracted DataViews can
then be used to construct the StateDescriptors in the operator, i.e., we
should have hold the state spec and the state descriptor id in Java
operator and Python worker can access the state by specifying the
corresponding state descriptor id.




2. Serializability of functions: How do we ensure serializability of
functions for catalog persistence? In the Scala/Java API, we would like
to register classes instead of instances soon. This is the only way to
store a function properly in a catalog or we need some
serialization/deserialization logic in the function interfaces to
convert an instance to string properties.


The Python function will be serialized with CloudPickle anyway in the

Python API as we need to transfer it to the Python worker which can then
deserialize it for execution. The serialized Python function can be stored
into catalog.




3. TableEnvironment: What is the signature of `register_function(self,
name, function)`? Does it accept both a class and function? Like `class
Sum` and `def split()`? Could you add some examples for registering both
kinds of functions?


It has been already supported which you mentioned. You can find an

example in the POC code:
https://github.com/dianfu/flink/commit/93f41ba173482226af7513fdec5acba72b274489#diff-34f619b31a7e38604e22a42a441fbe2fR26




4. FunctionDefinition: Function definition is not a user-defined
function definition. It is the highest interface for both user-defined
and built-in functions. I'm not sure if getLanguage() should be part of
this interface or one-level down which would be `UserDefinedFunction`.
Built-in functions will never be implemented in a different language. In
any case, I would vote for removing the UNKNOWN language, because it
does not solve anything. Why should a user declare a function that the
runtime can not handle? I also find the term `JAVA` confusing for Scala
users. How about `FunctionLanguage.JVM` instead?


Actually we may have built-in Python functions in the future. Regarding

to the following expression: py_udf1(a, b) + py_udf2(c), if there is
built-in Python
funciton for '+' operator, then we don't need to mix using Java and Python
UDFs. In this way, we can improve the execution performance.
Regarding to removing FunctionLanguage.UNKNOWN and renaming
FunctionLanguage.Java to Fun

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

This sounds exactly as the module approach I mentioned, no?

Regards,
Timo

On 03.09.19 13:42, Danny Chan wrote:

Thanks Bowen for bring up this topic, I think it’s a useful refactoring to make 
our function usage more user friendly.

For the topic of how to organize the builtin operators and operators of Hive, 
here is a solution from Apache Calcite, the Calcite way is to make every 
dialect operators a “Library”, user can specify which libraries they want to 
use for a sql query. The builtin operators always comes as the first class 
objects and the others are used from the order they appears. Maybe you can take 
a reference.

[1] 
https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28

Best,
Danny Chan
在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:

Hi folks,

I'd like to kick off a discussion on reworking Flink's FunctionCatalog.
It's critically helpful to improve function usability in SQL.

https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing

In short, it:
- adds support for precise function reference with fully/partially
qualified name
- redefines function resolution order for ambiguous function reference
- adds support for Hive's rich built-in functions (support for Hive user
defined functions was already added in 1.9.0)
- clarifies the concept of temporary functions

Would love to hear your thoughts.

Bowen





Re: [DISCUSS] FLIP-54: Evolve ConfigOption and Configuration

2019-09-03 Thread Timo Walther

Hi Danny,

yes, this FLIP covers all the building blocks we need also for 
unification of the DDL properties.


Regards,
Timo


On 03.09.19 13:45, Danny Chan wrote:

with the new SQL DDL

based on properties as well as more connectors and formats coming up,
unified configuration becomes more important

I Cann’t agree more, do you think we can unify the config options key format 
here for all the DDL properties ?

Best,
Danny Chan
在 2019年8月16日 +0800 PM10:12,dev@flink.apache.org,写道:

with the new SQL DDL
based on properties as well as more connectors and formats coming up,
unified configuration becomes more important





Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-03 Thread Timo Walther

Hi Kurt,

it should not affect the functions and operations we currently have in 
SQL. It just categorizes the available built-in functions. It is kind of 
an orthogonal concept to the catalog API but built-in functions deserve 
this special kind of treatment. CatalogFunction still fits perfectly in 
there because the regular catalog object resolution logic is not 
affected. So tables and functions are resolved in the same way but with 
built-in functions that have priority as in the original design.


Regards,
Timo


On 03.09.19 15:26, Kurt Young wrote:

Does this only affect the functions and operations we currently have in SQL
and
have no effect on tables, right? Looks like this is an orthogonal concept
with Catalog?
If the answer are both yes, then the catalog function will be a weird
concept?

Best,
Kurt


On Tue, Sep 3, 2019 at 8:10 PM Danny Chan  wrote:


The way you proposed are basically the same as what Calcite does, I think
we are in the same line.

Best,
Danny Chan
在 2019年9月3日 +0800 PM7:57,Timo Walther ,写道:

This sounds exactly as the module approach I mentioned, no?

Regards,
Timo

On 03.09.19 13:42, Danny Chan wrote:

Thanks Bowen for bring up this topic, I think it’s a useful

refactoring to make our function usage more user friendly.

For the topic of how to organize the builtin operators and operators

of Hive, here is a solution from Apache Calcite, the Calcite way is to make
every dialect operators a “Library”, user can specify which libraries they
want to use for a sql query. The builtin operators always comes as the
first class objects and the others are used from the order they appears.
Maybe you can take a reference.

[1]

https://github.com/apache/calcite/commit/9a4eab5240d96379431d14a1ac33bfebaf6fbb28

Best,
Danny Chan
在 2019年8月28日 +0800 AM2:50,Bowen Li ,写道:

Hi folks,

I'd like to kick off a discussion on reworking Flink's

FunctionCatalog.

It's critically helpful to improve function usability in SQL.



https://docs.google.com/document/d/1w3HZGj9kry4RsKVCduWp82HkW6hhgi2unnvOAUS72t8/edit?usp=sharing

In short, it:
- adds support for precise function reference with fully/partially
qualified name
- redefines function resolution order for ambiguous function

reference

- adds support for Hive's rich built-in functions (support for Hive

user

defined functions was already added in 1.9.0)
- clarifies the concept of temporary functions

Would love to hear your thoughts.

Bowen






Re: [DISCUSS] Flink Python User-Defined Function for Table API

2019-09-04 Thread Timo Walther

Hi Jincheng,

2. Serializability of functions: "#2 is very convenient for users" means 
only until they have the first backwards-compatibility issue, after that 
they will find it not so convinient anymore and will ask why the 
framework allowed storing such objects in a persistent storage. I don't 
want to be picky about it, but wanted to raise awareness that sometimes 
it is ok to limit use cases to guide users for devloping 
backwards-compatible programs.


Thanks for the explanation fo the remaining items. It sounds reasonable 
to me. Regarding the example with `getKind()`, I actually meant 
`org.apache.flink.table.functions.ScalarFunction#getKind` we don't allow 
users to override this property. And I think we should do something 
similar for the getLanguage property.


Thanks,
Timo

On 03.09.19 15:01, jincheng sun wrote:

Hi Timo,

Thanks for the quick reply ! :)
I have added more example for #3 and #5 to the FLIP. That are great
suggestions !

Regarding 2:

There are two kind Serialization for CloudPickle(Which is different from
Java):
  1) For class and function which can be imported, CloudPickle only
serialize the full path of the class and function (just like java class
name).
  2) For the class and function which can not be imported, CloudPickle will
serialize the full content of the class and function.
For #2, It means that we can not just store the full path of the class and
function.

The above serialization is recursive.

However, there is indeed an problem of backwards compatibility when the
module path of the parent class changed. But I think this is an rare case
and acceptable. i.e., For Flink framework we never change the user
interface module path if we want to keep backwards compatibility. For user
code, if they change the interface of UDF's parent, they should re-register
their functions.

If we do not want support #2, we can store the full path of class and
function, in that case we have no backwards compatibility problem. But I
think the #2 is very convenient for users.

What do you think?

Regarding 4:
As I mentioned earlier, there may be built-in Python functions and I think
language is a "function" concept. Function and Language are orthogonal
concepts.
We may have R, GO and other language functions in the future, not only
user-defined, but also built-in functions.

You are right that users will not set this method and for Python functions,
it will be set in the code-generated Java function by the framework. So, I
think we should declare the getLanguage() in FunctionDefinition for now.
(I'm not pretty sure what do you mean by saying that getKind() is final in
UserDefinedFunction?)

Best,
Jincheng

Timo Walther  于2019年9月3日周二 下午6:01写道:


Hi Jincheng,

thanks for your response.

2. Serializability of functions: Using some arbitrary serialization
format for shipping a function to worker sounds fine to me. But once we
store functions a the catalog we need to think about backwards
compatibility and evolution of interfaces etc. I'm not sure if
CloudPickle is the right long-term storage format for this. If we don't
think about this in advance, we are basically violating our code quality
guide [1] of never use Java Serialization but in the Python-way. We are
using the RPC serialization for persistence.

3. TableEnvironment: Can you add some example to the FLIP? Because API
code like the following is not covered there:

self.t_env.register_function("add_one", udf(lambda i: i + 1,
DataTypes.BIGINT(),
  DataTypes.BIGINT()))
self.t_env.register_function("subtract_one", udf(SubtractOne(),
DataTypes.BIGINT(),
DataTypes.BIGINT()))
self.t_env.register_function("add", add)

4. FunctionDefinition: Your response still doesn't answer my question
entirely. Why do we need FunctionDefinition.getLanguage() if this is a
"user-defined function" concept and not a "function" concept. In any
case, all users should not be able to set this method. So it must be
final in UserDefinedFunction similar to getKind().

5. Function characteristics: If UserDefinedFunction is defined in
Python, why is it not used in your example in FLIP-58. You could you
extend the example to show how to specify these attributes in the FLIP?

Regards,
Timo

[1] https://flink.apache.org/contributing/code-style-and-quality-java.html

On 02.09.19 15:35, jincheng sun wrote:

Hi Timo,

Great thanks for your feedback. I would like to share my thoughts with

you

inline. :)

Best,
Jincheng

Timo Walther  于2019年9月2日周一 下午5:04写道:


Hi all,

the FLIP looks awesome. However, I would like to discuss the changes to
the user-facing parts again. Some feedback:

1. DataViews: With the current non-annotation design for DataViews, we
cannot perform eager state declaration, right? At which point during
execution do we know which state is required by the function? We need to
instantiate the function fi

Re: FLIP-63: Rework table partition support

2019-09-04 Thread Timo Walther

Hi Jingsong,

thanks for your proposal. Could you repost this email with the subject:

"[DISCUSS] FLIP-63: Rework table partition support"

Some people have filters for [DISCUSS] threads and it also makes 
important emails more prominent visually.


Thanks,
Timo

On 04.09.19 09:11, JingsongLee wrote:

Hi everyone,

We would like to start a discussion thread on "FLIP-63: Rework table
partition support"(Design doc: [1]), where we describe how to partition
  support in flink and how to integrate to hive partition.

This FLIP addresses:
- Introduce whole story about partition support.
- Introduce and discuss DDL of partition support.
- Introduce static and dynamic partition insert.
- Introduce partition pruning
- Introduce dynamic partition implementation

Details can be seen in the design document.
Looking forward to your feedbacks. Thank you.

[1] 
https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing

Best,
Jingsong Lee





Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-04 Thread Timo Walther

the

session owner, they are unique to each user, and users create them

on

purpose to be the highest priority in order to overwrite system

info

(built-in functions in this case).

In your case, why would users name a temporary function the same

as a

built-in function then? Since using that name in ambiguous function
reference will always be resolved to built-in functions, creating a
same-named temp function would be meaningless in the end.


On Tue, Sep 3, 2019 at 1:44 PM Bowen Li 

wrote:

Hi Jingsong,

Re> 1.Hive built-in functions is an intermediate solution. So we

should

not introduce interfaces to influence the framework. To make
Flink itself more powerful, we should implement the functions
we need to add.

Yes, please see the doc.

Re> 2.Non-flink built-in functions are easy for users to change

their

behavior. If we support some flink built-in functions in the
future but act differently from non-flink built-in, this will

lead

to

changes in user behavior.

There's no such concept as "external built-in functions" any more.
Built-in functions of external systems will be treated as special

catalog

functions.

Re> Another question is, does this fallback include all

hive built-in functions? As far as I know, some hive functions
have some hacky. If possible, can we start with a white list?
Once we implement some functions to flink built-in, we can
also update the whitelist.

Yes, that's something we thought of too. I don't think it's super
critical to the scope of this FLIP, thus I'd like to leave it to

future

efforts as a nice-to-have feature.


On Tue, Sep 3, 2019 at 1:37 PM Bowen Li 

wrote:

Hi Kurt,

Re: > What I want to propose is we can merge #3 and #4, make them

both

under

"catalog" concept, by extending catalog function to make it have

ability to

have built-in catalog functions. Some benefits I can see from

this

approach:

1. We don't have to introduce new concept like external built-in

functions.

Actually I don't see a full story about how to treat a built-in

functions, and it

seems a little bit disrupt with catalog. As a result, you have

to

make

some restriction

like "hive built-in functions can only be used when current

catalog

is

hive catalog".

Yes, I've unified #3 and #4 but it seems I didn't update some

part

of

the doc. I've modified those sections, and they are up to date

now.

In short, now built-in function of external systems are defined

as

a

special kind of catalog function in Flink, and handled by Flink

as

following:
- An external built-in function must be associated with a catalog

for

the purpose of decoupling flink-table and external systems.
- It always resides in front of catalog functions in ambiguous

function

reference order, just like in its own external system
- It is a special catalog function that doesn’t have a

schema/database

namespace
- It goes thru the same instantiation logic as other user defined
catalog functions in the external system

Please take another look at the doc, and let me know if you have

more

questions.


On Tue, Sep 3, 2019 at 7:28 AM Timo Walther 

wrote:

Hi Kurt,

it should not affect the functions and operations we currently

have

in

SQL. It just categorizes the available built-in functions. It is

kind

of
an orthogonal concept to the catalog API but built-in functions

deserve

this special kind of treatment. CatalogFunction still fits

perfectly

in

there because the regular catalog object resolution logic is not
affected. So tables and functions are resolved in the same way

but

with

built-in functions that have priority as in the original design.

Regards,
Timo


On 03.09.19 15:26, Kurt Young wrote:

Does this only affect the functions and operations we currently

have

in SQL

and
have no effect on tables, right? Looks like this is an

orthogonal

concept

with Catalog?
If the answer are both yes, then the catalog function will be a

weird

concept?

Best,
Kurt


On Tue, Sep 3, 2019 at 8:10 PM Danny Chan <

yuzhao@gmail.com

wrote:

The way you proposed are basically the same as what Calcite

does, I

think

we are in the same line.

Best,
Danny Chan
在 2019年9月3日 +0800 PM7:57,Timo Walther 
,写道:

This sounds exactly as the module approach I mentioned, no?

Regards,
Timo

On 03.09.19 13:42, Danny Chan wrote:

Thanks Bowen for bring up this topic, I think it’s a useful

refactoring to make our function usage more user friendly.

For the topic of how to organize the builtin operators and

operators

of Hive, here is a solution from Apache Calcite, the Calcite

way

is

to make

every dialect operators a “Library”, user can specify which

libraries they

want to use for a sql query. The builtin operators always

comes

as

the

first class objects and the others are used from the order

they

appears.

Maybe you can take a reference.

[1]

https://github.com/apache/calcite/commit/9a4eab5240d9637

Re: [DISCUSS] FLIP-55: Introduction of a Table API Java Expression DSL

2019-09-04 Thread Timo Walther
Thanks for your feedback Rong. You are right, we can still have shorter 
names if the user feedback demands that. Adding additional shorter 
method names is always possible. So let's stick to lit() for now.


I converted the Google document into a wiki page:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-55%3A+Introduction+of+a+Table+API+Java+Expression+DSL

I would start a voting thread by tomorrow. If there are no objections.

Thanks,
Timo


On 04.09.19 02:52, Rong Rong wrote:

Thanks for putting together the proposal @Timo and sorry for joining the
discussion thread late.

I also share the same thought with Fabian on the ease-of-use front. However
I was wondering if we need to start the expression design with them?
One thing I can think of is: is it possible to support "alias" later on in
the Expression once we collect enough feedback from the users?

IMO, It is always easier to expand the APIs later than reducing them.

Cheers,
Rong

On Mon, Sep 2, 2019 at 2:37 AM Timo Walther  wrote:


Hi all,

I see a majority votes for `lit(12)` so let's adopt that in the FLIP.
The `$("field")` would consider Fabian's concerns so I would vote for
keeping it like that.

One more question for native English speakers, is it acceptable to have
`isEqual` instead of `isEqualTo` and `isGreater` instead of
`isGreaterThan`?

If there are no more concerns, I will start a voting thread soon.

Thanks,
Timo


On 29.08.19 12:24, Fabian Hueske wrote:

Hi,

IMO, we should define what we would like to optimize for:
1) easy-to-get-started experience or
2) productivity and ease-of-use

While 1) is certainly important, I think we should put more emphasis on
goal 2).
That's why I favor as short as possible names for commonly used methods
like column references and literals/values.
These are used *many* times in *every* query.
Every user who uses the API for more than 30 mins will know what $() or

v()

(or whatever method names we come up with) are used for and everybody who
doesn't know can have a look at the JavaDocs or regular documentation.
Shorter method names are not only about increasing the speed to write a
query, but also reducing clutter that needs to be parsed to understand an
expression / query.

I'm OK with descriptive names for other expressions like call(),
isEqualTo() (although these could be the commonly used eq(), gte(),

etc.),

and so on but column references (and literals) should be as lightweight

as

possible, IMO.

Cheers,
Fabian

Am Do., 29. Aug. 2019 um 12:15 Uhr schrieb Timo Walther <

twal...@apache.org

:
I'm fine with `lit()`. Regarding `col()`, I initially suggested `ref()`
but I think Fabian and Dawid liked single char methods for the most
commonly used expressions.

Btw, what is your opinion on the names of commonly used methods such as
`isEqual`, `isGreaterOrEqual`? Are we fine with the current naming.
In theory we could make them shorter like `equals(), greaterOrEqual()`
or even shorter to `eq`, `gt`, `gte`?

Thanks,
Timo


On 29.08.19 11:51, Aljoscha Krettek wrote:

Overall, this is a very nice development that should also simplify the

code base once we deprecate the expression parser!

Regarding method names, I agree with Seth that values/literals should

use something like “lit()”. I also think that for column references we
could use “col()” to make it clear that it is a column reference. What

do

you think?

Aljoscha


On 28. Aug 2019, at 15:59, Seth Wiesman  wrote:

I would prefer ‘lit()’ over  ‘val()’ since val is a keyword in Scala.

Assuming the intention is to make the dsl ergonomic for Scala

developers.

Seth


On Aug 28, 2019, at 7:58 AM, Timo Walther 

wrote:

Hi David,

thanks for your feedback. I was also skeptical about 1 char method

names, I restored the `val()` method for now. If you read literature

such

as Wikipedia [1]: "literal is a notation for representing a fixed value

in

source code. Almost all programming languages have notations for atomic
values". So they are also talking about "values".

Alteratively we could use `lit(12)` or `l(12)` but I'm not convinced

that this is better.

Regards,
Timo

[1] https://en.wikipedia.org/wiki/Literal_(computer_programming)


On 27.08.19 22:10, David Anderson wrote:
TImo,

While it's not exactly pretty, I don't mind the $("field")

construct.

It's not particularly surprising. The v() method troubles me more;

it

looks mysterious. I think we would do better to have something more
explicit. val() isn't much better -- val("foo") could be interpreted
to mean the value of the "foo" column, or a literal string.

David


On Tue, Aug 27, 2019 at 5:45 PM Timo Walther 

wrote:

Hi David,

thanks for your feedback. With the current design, the DSL would be

free

of any ambiguity but it is definitely more verbose esp. around

defining

values.

I would be happy about further suggestions that make the DS

Re: FLIP-24 / SQL GW

2019-09-16 Thread Timo Walther

Hi Hanan,

the community is currently reworking parts of the architecture of Flink 
SQL first for making it a good foundation for further tools around it 
(see also FLIP-32 and following SQL-related FLIPs). In Flink 1.10 the 
SQL Client will not receive major updates but it seems likely that Flink 
1.11 will offer more options for submission of jobs and retrieval of 
results.


Regards,
Timo


On 16.09.19 20:19, Hanan Yehudai wrote:

Flip – 24 mentions SQL GW in the roadmap .  is there any progress on this path 
? is something planned for 1.10 /  2.0 ?





Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-16 Thread Timo Walther

Hi Bowen,

I understand the potential benefit of overriding certain built-in 
functions. I'm open to such a feature if many people agree. However, it 
would be great to still support overriding catalog functions with 
temporary functions in order to prototype a query even though a 
catalog/database might not be available currently or should not be 
modified yet. How about we support both cases?


CREATE TEMPORARY FUNCTION abs
-> creates/overrides a built-in function and never consideres current 
catalog and database; inconsistent with other DDL but acceptable for 
functions I guess.

CREATE TEMPORARY FUNCTION cat.db.fun
-> creates/overrides a catalog function

Regarding "Flink don't have any other built-in objects (tables, views) 
except functions", this might change in the near future. Take 
https://issues.apache.org/jira/browse/FLINK-13900 as an example.


Thanks,
Timo

On 14.09.19 01:40, Bowen Li wrote:

Hi Fabian,

Yes, I agree 1-part/no-override is the least favorable thus I didn't
include that as a voting option, and the discussion is mainly between
1-part/override builtin and 3-part/not override builtin.

Re > However, it means that temp functions are differently treated than
other db objects.
IMO, the treatment difference results from the fact that functions are a
bit different from other objects - Flink don't have any other built-in
objects (tables, views) except functions.

Cheers,
Bowen





Re: [DISCUSS] FLIP-60: Restructure the Table API & SQL documentation

2019-09-16 Thread Timo Walther

Dawid Wysakowicz  于2019年8月30日周五 下午7:43写道:


+1 to the idea of restructuring the docs.

My only suggestion to consider is how about moving the
User-Defined-Extensions subpages to corresponding broader topics?

Sources & Sinks >> Connect to external systems

Catalogs >> Connect to external systems

and then have a Functions sections with subsections:

functions

 |- built in functions

 |- user defined functions


Best,

Dawid

On 30/08/2019 10:59, Timo Walther wrote:

Hi everyone,

the Table API & SQL documentation was already in a very good

shape

in

Flink 1.8. However, in the past it was mostly presented as an

addition

to DataStream API. As the Table and SQL world is growing quickly,
stabilizes in its concepts, and is considered as another

top-level

API

and closed ecosystem, it is time to restructure the docs a little

bit

to represent the vision of FLIP-32.

Current state:


https://ci.apache.org/projects/flink/flink-docs-master/dev/table/

We would like to propose the following FLIP-60 for a new

structure:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=127405685


Looking forward to feedback.

Thanks,

Timo









Re: [DISCUSS] FLIP-64: Support for Temporary Objects in Table module

2019-09-16 Thread Timo Walther

Hi Dawid,

thanks for the design document. It fixes big concept gaps due to 
historical reasons with proper support for serializability and catalog 
support in mind.


I would not mind a registerTemporarySource/Sink, but the problem that I 
see is that many people think that this is the recommended way of 
registering a table source/sink which is not true. We should guide users 
to either use connect() or DDL API which can be validated and stored in 
catalog.


Also from a concept perspective, registering a source/sink does not fit 
into the SQL world. SQL does not know about source/sinks but only about 
tables. If the responsibility of a TableSource/TableSink is just a pure 
physical data consumer/producer that is not connected to the actual 
logical table schema, we would need a possibility of defining time 
attributes and interpreting/converting a changelog. This should be done 
by the framework with information from the DDL/connect() and not be 
defined in every table source.


Regards,
Timo


On 09.09.19 14:16, JingsongLee wrote:

Hi dawid:

It is difficult to describe specific examples.
Sometimes users will generate some java converters through some
  Java code, or generate some Java classes through third-party
  libraries. Of course, these can be best done through properties.
But this requires additional work from users.My suggestion is to
  keep this Java instance class way that is user-friendly.

Best,
Jingsong Lee


--
From:Dawid Wysakowicz 
Send Time:2019年9月6日(星期五) 16:21
To:dev 
Subject:Re: [DISCUSS] FLIP-64: Support for Temporary Objects in Table module

Hi all,
@Jingsong Could you elaborate a bit more what do you mean by
"some Connectors are difficult to convert all states to properties"
All the Flink provided connectors will definitely be expressible with properties 
(In the end you should be able to use them from DDL). I think if a TableSource is 
complex enough that it handles filter push down, partition support etc. should 
rather be made available both from DDL & java/scala code. I'm happy to 
reconsider adding registerTemporaryTable(String path, TableSource source) if you 
have some concrete examples in mind.


@Xuefu: We also considered the ObjectIdentifier (or actually introducing a new 
identifier representation to differentiate between resolved and unresolved 
identifiers) with the same concerns. We decided to suggest the string & parsing 
logic because of usability.
 tEnv.from("cat.db.table")
is shorter and easier to write than
 tEnv.from(Identifier.for("cat", "db", "name")
And also implicitly solves the problem what happens if a user (e.g. used to 
other systems) uses that API in a following manner:
 tEnv.from(Identifier.for("db.name")
I'm happy to revisit it if the general consensus is that it's better to use the 
OO aproach.
Best,
Dawid

On 06/09/2019 10:00, Xuefu Z wrote:

Thanks to Dawid for starting the discussion and writeup. It looks pretty
good to me except that I'm a little concerned about the object reference
and string parsing in the code, which seems to an anti-pattern to OOP. Have
we considered using ObjectIdenitifier with optional catalog and db parts,
esp. if we are worried about arguments of variable length or method
overloading? It's quite likely that the result of string parsing is an
ObjectIdentifier instance any way.

Having string parsing logic in the code is a little dangerous as it
duplicates part of the DDL/DML parsing, and they can easily get out of sync.

Thanks,
Xuefu

On Fri, Sep 6, 2019 at 1:57 PM JingsongLee 
wrote:


Thanks dawid, +1 for this approach.

One concern is the removal of registerTableSink & registerTableSource
  in TableEnvironment. It has two alternatives:
1.the properties approach (DDL, descriptor).
2.from/toDataStream.

#1 can only be properties, not java states, and some Connectors
  are difficult to convert all states to properties.
#2 can contain java state. But can't use TableSource-related features,
like project & filter push down, partition support, etc..

Any idea about this?

Best,
Jingsong Lee


--
From:Dawid Wysakowicz 
Send Time:2019年9月4日(星期三) 22:20
To:dev 
Subject:[DISCUSS] FLIP-64: Support for Temporary Objects in Table module

Hi all,
As part of FLIP-30 a Catalog API was introduced that enables storing table
meta objects permanently. At the same time the majority of current APIs
create temporary objects that cannot be serialized. We should clarify the
creation of meta objects (tables, views, functions) in a unified way.
Another current problem in the API is that all the temporary objects are
stored in a special built-in catalog, which is not very intuitive for many
users, as they must be aware of that catalog to reference temporary objects.
Lastly, different APIs have different ways of providing object paths:

String path…,
String path, String pathContinued…
String name
We should choo

Re: [DISCUSS] FLIP-57 - Rework FunctionCatalog

2019-09-17 Thread Timo Walther

Hi everyone,

@Xuefu: I would like to avoid adding too many things incrementally. 
Users should be able to override all catalog objects consistently 
according to FLIP-64 (Support for Temporary Objects in Table module). If 
functions are treated completely different, we need more code and 
special cases. From an implementation perspective, this topic only 
affects the lookup logic which is rather low implementation effort which 
is why I would like to clarify the remaining items. As you said, we have 
a slight consenus on overriding built-in functions; we should also 
strive for reaching consensus on the remaining topics.


@Dawid: I like your idea as it ensures registering catalog objects 
consistent and the overriding of built-in functions more explicit.


Thanks,
Timo


On 17.09.19 11:59, kai wang wrote:

hi, everyone
I think this flip is very meaningful. it supports functions that can be
shared by different catalogs and dbs, reducing the duplication of functions.

Our group based on flink's sql parser module implements create function
feature, stores the parsed function metadata and schema into mysql, and
also customizes the catalog, customizes sql-client to support custom
schemas and functions. Loaded, but the function is currently global, and is
not subdivided according to catalog and db.

In addition, I very much hope to participate in the development of this
flip, I have been paying attention to the community, but found it is more
difficult to join.
  thank you.

Xuefu Z  于2019年9月17日周二 上午11:19写道:


Thanks to Tmo and Dawid for sharing thoughts.

It seems to me that there is a general consensus on having temp functions
that have no namespaces and overwrite built-in functions. (As a side note
for comparability, the current user defined functions are all temporary and
having no namespaces.)

Nevertheless, I can also see the merit of having namespaced temp functions
that can overwrite functions defined in a specific cat/db. However,  this
idea appears orthogonal to the former and can be added incrementally.

How about we first implement non-namespaced temp functions now and leave
the door open for namespaced ones for later releases as the requirement
might become more crystal? This also helps shorten the debate and allow us
to make some progress along this direction.

As to Dawid's idea of having a dedicated cat/db to host the temporary temp
functions that don't have namespaces, my only concern is the special
treatment for a cat/db, which makes code less clean, as evident in treating
the built-in catalog currently.

Thanks,
Xuefiu

On Mon, Sep 16, 2019 at 5:07 PM Dawid Wysakowicz <
wysakowicz.da...@gmail.com>
wrote:


Hi,
Another idea to consider on top of Timo's suggestion. How about we have a
special namespace (catalog + database) for built-in objects? This catalog
would be invisible for users as Xuefu was suggesting.

Then users could still override built-in functions, if they fully qualify
object with the built-in namespace, but by default the common logic of
current dB & cat would be used.

CREATE TEMPORARY FUNCTION func ...
registers temporary function in current cat & dB

CREATE TEMPORARY FUNCTION cat.db.func ...
registers temporary function in cat db

CREATE TEMPORARY FUNCTION system.system.func ...
Overrides built-in function with temporary function

The built-in/system namespace would not be writable for permanent

objects.

WDYT?

This way I think we can have benefits of both solutions.

Best,
Dawid


On Tue, 17 Sep 2019, 07:24 Timo Walther,  wrote:


Hi Bowen,

I understand the potential benefit of overriding certain built-in
functions. I'm open to such a feature if many people agree. However, it
would be great to still support overriding catalog functions with
temporary functions in order to prototype a query even though a
catalog/database might not be available currently or should not be
modified yet. How about we support both cases?

CREATE TEMPORARY FUNCTION abs
-> creates/overrides a built-in function and never consideres current
catalog and database; inconsistent with other DDL but acceptable for
functions I guess.
CREATE TEMPORARY FUNCTION cat.db.fun
-> creates/overrides a catalog function

Regarding "Flink don't have any other built-in objects (tables, views)
except functions", this might change in the near future. Take
https://issues.apache.org/jira/browse/FLINK-13900 as an example.

Thanks,
Timo

On 14.09.19 01:40, Bowen Li wrote:

Hi Fabian,

Yes, I agree 1-part/no-override is the least favorable thus I didn't
include that as a voting option, and the discussion is mainly between
1-part/override builtin and 3-part/not override builtin.

Re > However, it means that temp functions are differently treated

than

other db objects.
IMO, the treatment difference results from the fact that functions

are

a

bit different from other objects - Flink don't have any other

built-in

objects (tables, views) except functions.

Cheers,
Bowen





--
Xuefu Zhang

"In Honey We Trust!"





Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-25 Thread Timo Walther

Hi Feng,

thanks for proposing this FLIP. It makes a lot of sense to finally 
support querying tables at a specific point in time or hopefully also 
ranges soon. Following time-versioned tables.


Here is some feedback from my side:

1. Syntax

Can you elaborate a bit on the Calcite restrictions?

Does Calcite currently support `AS OF` syntax for this but not `FOR 
SYSTEM_TIME AS OF`?


It would be great to support `AS OF` also for time-versioned joins and 
have a unified and short syntax.


Once a fix is merged in Calcite for this, we can make this available in 
Flink earlier by copying the corresponding classes until the next 
Calcite upgrade is performed.


2. Semantics

How do we interpret the timestamp? In Flink we have 2 timestamp types 
(TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF TIMESTAMP 
'2023-04-27 00:00:00', in which timezone will the timestamp be? We will 
convert it to TIMESTAMP_LTZ?


We definely need to clarify this because the past has shown that 
daylight saving times make our lives hard.


Thanks,
Timo

On 25.05.23 10:57, Feng Jin wrote:

Hi, everyone.

I’d like to start a discussion about FLIP-308: Support Time Travel In Batch
Mode [1]


Time travel is a SQL syntax used to query historical versions of data. It
allows users to specify a point in time and retrieve the data and schema of
a table as it appeared at that time. With time travel, users can easily
analyze and compare historical versions of data.


With the widespread use of data lake systems such as Paimon, Iceberg, and
Hudi, time travel can provide more convenience for users' data analysis.


Looking forward to your opinions, any suggestions are welcomed.



1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode



Best.

Feng





Re: [DISCUSS] FLIP-308: Support Time Travel In Batch Mode

2023-05-25 Thread Timo Walther

Also: How do we want to query the most recent version of a table?

`AS OF CURRENT_TIMESTAMP` would be ideal, but according to the docs both 
the type is TIMESTAMP_LTZ and what is even more concerning is the it 
actually is evalated row-based:


> Returns the current SQL timestamp in the local time zone, the return 
type is TIMESTAMP_LTZ(3). It is evaluated for each record in streaming 
mode. But in batch mode, it is evaluated once as the query starts and 
uses the same result for every row.


This could make it difficult to explain in a join scenario of multiple 
snapshotted tables.


Regards,
Timo


On 25.05.23 12:29, Timo Walther wrote:

Hi Feng,

thanks for proposing this FLIP. It makes a lot of sense to finally 
support querying tables at a specific point in time or hopefully also 
ranges soon. Following time-versioned tables.


Here is some feedback from my side:

1. Syntax

Can you elaborate a bit on the Calcite restrictions?

Does Calcite currently support `AS OF` syntax for this but not `FOR 
SYSTEM_TIME AS OF`?


It would be great to support `AS OF` also for time-versioned joins and 
have a unified and short syntax.


Once a fix is merged in Calcite for this, we can make this available in 
Flink earlier by copying the corresponding classes until the next 
Calcite upgrade is performed.


2. Semantics

How do we interpret the timestamp? In Flink we have 2 timestamp types 
(TIMESTAMP and TIMESTAMP_LTZ). If users specify AS OF TIMESTAMP 
'2023-04-27 00:00:00', in which timezone will the timestamp be? We will 
convert it to TIMESTAMP_LTZ?


We definely need to clarify this because the past has shown that 
daylight saving times make our lives hard.


Thanks,
Timo

On 25.05.23 10:57, Feng Jin wrote:

Hi, everyone.

I’d like to start a discussion about FLIP-308: Support Time Travel In 
Batch

Mode [1]


Time travel is a SQL syntax used to query historical versions of data. It
allows users to specify a point in time and retrieve the data and 
schema of

a table as it appeared at that time. With time travel, users can easily
analyze and compare historical versions of data.


With the widespread use of data lake systems such as Paimon, Iceberg, and
Hudi, time travel can provide more convenience for users' data analysis.


Looking forward to your opinions, any suggestions are welcomed.



1.
https://cwiki.apache.org/confluence/display/FLINK/FLIP-308%3A+Support+Time+Travel+In+Batch+Mode



Best.

Feng







Re: [VOTE] FLIP-346: Deprecate ManagedTable related APIs

2023-07-24 Thread Timo Walther

+1

Regards,
Timo


On 24.07.23 04:00, liu ron wrote:

+1

Best,
Ron

Lincoln Lee  于2023年7月21日周五 16:09写道:


+1

Best,
Lincoln Lee


Leonard Xu  于2023年7月21日周五 16:07写道:


+1

Best,
Leonard


On Jul 21, 2023, at 4:02 PM, yuxia 

wrote:


+1(binging)

Best regards,
Yuxia

- 原始邮件 -
发件人: "Jane Chan" 
收件人: "dev" 
发送时间: 星期五, 2023年 7 月 21日 下午 3:41:11
主题: [VOTE] FLIP-346: Deprecate ManagedTable related APIs

Hi developers,

Thanks for all the feedback on FLIP-346: Deprecate ManagedTable related
APIs[1].
Based on the discussion[2], we have reached a consensus, so I would

like

to

start a vote.

The vote will last for at least 72 hours unless there is an objection

or

insufficient votes.

[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-346%3A+Deprecate+ManagedTable+related+APIs

[2] https://lists.apache.org/thread/5dvqyhqp5fbtm54944xohts71modwd99

Best,
Jane











Re: [DISCUSS][2.0] FLIP-344: Remove parameter in RichFunction#open

2023-07-24 Thread Timo Walther

+1

But instead we should add a OpenContext there to keep the signature 
stable but still be able to add parameters.


Regards,
Timo

On 21.07.23 12:24, Jing Ge wrote:

+1

On Fri, Jul 21, 2023 at 10:22 AM Yuxin Tan  wrote:


+1

Best,
Yuxin


Xintong Song  于2023年7月21日周五 12:04写道:


+1

Best,

Xintong



On Fri, Jul 21, 2023 at 10:52 AM Wencong Liu 

wrote:



Hi devs,

I would like to start a discussion on FLIP-344: Remove parameter in
RichFunction#open [1].

The open() method in RichFunction requires a Configuration instance as

an

argument,
which is always passed as a new instance without any configuration
parameters in
AbstractUdfStreamOperator#open. Thus, it is unnecessary to include this
parameter
in the open() method.
As such I propose to remove the Configuration field from
RichFunction#open(Configuration parameters).
Looking forward to your feedback.
[1]




https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=263425231

Best regards,


Wencong Liu










[DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-07-25 Thread Timo Walther

Hi everyone,

I would like to start a discussion about introducing the concept of 
"System Columns" in SQL and Table API.


The subject sounds bigger than it actually is. Luckily, Flink SQL 
already exposes the concept of metadata columns. And this proposal is 
just a slight adjustment for how metadata columns can be used as system 
columns.


The biggest problem of metadata columns currently is that a catalog 
implementation can't provide them by default because they would affect 
`SELECT *` when adding another one.


Looking forward to your feedback on FLIP-348:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-348%3A+Support+System+Columns+in+SQL+and+Table+API

Thanks,
Timo


Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-07 Thread Timo Walther
 I second Alexey that systems columns should not be shown with
DESCRIBE statements.

WDYT? Thanks!

Best,
Paul Lam


2023年7月31日 23:54,Jark Wu  写道:

Hi Timo,

Thanks for your proposal. I think this is a nice feature for users and I
prefer option 3.

I only have one concern about the concept of pseudo-column or
system-column,
because this is the first time we introduce it in Flink SQL. The
confusion is similar to the
question of Benchao and Sergey about the propagation of pseudo-column.

 From my understanding, a pseudo-column can be get from an arbitrary

query,

just similar to
ROWNUM in Oracle[1], such as :

SELECT *
FROM (SELECT * FROM employees ORDER BY employee_id)
WHERE ROWNUM < 11;

However, IIUC, the proposed "$rowtime" pseudo-column can only be got from
the physical table
and can't be got from queries even if the query propagates the rowtime
attribute. There was also
a discussion about adding a pseudo-column "_proctime" [2] to make lookup
join easier to use
which can be got from arbitrary queries. That "_proctime" may conflict

with

the proposed
pseudo-column concept.

Did you consider making it as a built-in defined pseudo-column "$rowtime"
which returns the
time attribute value (if exists) or null (if non-exists) for every
table/query, and pseudo-column
"$proctime" always returns PROCTIME() value for each table/query. In this
way, catalogs only need
to provide a default rowtime attribute and users can get it in the same
way. And we don't need
to introduce the contract interface of "Metadata Key Prefix Constraint"
which is still a little complex
for users and devs to understand.

Best,
Jark

[1]:


https://docs.oracle.com/cd/E11882_01/server.112/e41084/pseudocolumns009.htm#SQLRF00255

[2]: https://lists.apache.org/thread/7ln106qxyw8sp7ljq40hs2p1lb1gdwj5




On Fri, 28 Jul 2023 at 06:18, Alexey Leonov-Vendrovskiy <
vendrov...@gmail.com> wrote:



`SELECT * FROM (SELECT $rowtime, * FROM t);`
Am I right that it will show `$rowtime` in output ?



Yes, all explicitly selected columns become a part of the result (and
intermediate) schema, and hence propagate.

On Thu, Jul 27, 2023 at 2:40 PM Alexey Leonov-Vendrovskiy <
vendrov...@gmail.com> wrote:


Thank you, Timo, for starting this FLIP!

I propose the following change:

Remove the requirement that DESCRIBE need to show system columns.


Some concrete vendor specific catalog implementations might prefer this
approach.
Usually the same system columns are available on all (or family) of
tables, and it can be easily captured in the documentation.

For example, BigQuery does exactly this: there, pseudo-columns do not

show

up in the table schema in any place, but can be accessed via reference.

So I propose we:
a) Either we say that DESCRIBE doesn't show system columns,
b) Or leave this vendor-specific / or configurable via flag (if

needed).


Regards,
Alexey

On Thu, Jul 27, 2023 at 3:27 AM Sergey Nuyanzin 
wrote:


Hi Timo,

Thanks for the FLIP.
I also tend to think that Option 3 is better.

I would be also interested in a question mentioned by Benchao Li.
And a similar question about nested queries like
`SELECT * FROM (SELECT $rowtime, * FROM t);`
Am I right that it will show `$rowtime` in output ?


On Thu, Jul 27, 2023 at 6:58 AM Benchao Li 

wrote:



Hi Timo,

Thanks for the FLIP, I also like the idea and option 3 sounds good to

me.


I would like to discuss a case which is not mentioned in the current

FLIP.

How are the "System column"s expressed in intermediate result, e.g.

Join?

E.g. `SELECT * FROM t1 JOIN t2`, I guess it should not include

"system

columns" from t1 and t2 as you proposed, and for `SELECT t1.$rowtime,

*

FROM t1 JOIN t2`, it should also be valid.
Then the question is how to you plan to implement the "system

columns",

do

we need to add it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
I'm not sure that Calcite's "system column" feature is fully ready

for

this

since the code about this part is imported from the earlier project

before

it gets into Apache, and has not been considered much in the past
development.


Jing Ge  于2023年7月26日周三 00:01写道:


Hi Timo,

Thanks for your proposal. It is a very pragmatic feature. Among all

options

in the FLIP, option 3 is one I prefer too and I'd like to ask some
questions to understand your thoughts.

1. I did some research on pseudo columns, just out of curiosity, do

you

know why most SQL systems do not need any prefix with their pseudo

column?

2. Some platform providers will use ${variable_name} to define their

own

configurations and allow them to be embedded into SQL scripts. Will

there

be any conflict with option 3?

Best regards,
Jing

On Tue, Jul 25, 2023 at 7:00 PM Konstantin Knauf 


wrote:


Hi Timo,

this makes sense to me. Option 3 seems reasonable, too.

Cheers,

Konstantin

Am Di., 2

Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-15 Thread Timo Walther

Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to simply 
use METADATA VIRTUAL as system columns and only introduce a config 
option for the SELECT * behavior. Implementation-wise this means minimal 
effort and less new concepts.


Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for the 
late reply as I was on vacation. Let me answer to some of the topics:


1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next iteration.

2) Do you know why most SQL systems do not need any prefix with their 
pseudo column?


Because most systems do not have external catalogs or connectors. And 
also the number of system columns is limited to a handful of columns. 
Flink is more generic and thus more complex. And we have already the 
concepts of metadata columns. We need to be careful with not overloading 
our language.


3) Implementation details

 > how to you plan to implement the "system columns", do we need to add 
it to `RelNode` level? Or we just need to do it in the 
parsing/validating phase?

 > I'm not sure that Calcite's "system column" feature is fully ready

My plan would be to only modify the parsing/validating phase. I would 
like to avoid additional complexity in planner rules and 
connector/catalog interfaces. Metadata columns already support 
projection push down and are passed through the stack (via Schema, 
ResolvedSchema, SupportsReadableMetadata). Calcite's "system column" 
feature is not fully ready yet and it would be a large effort 
potentially introducing bugs in supporting it. Thus, I'm proposing to 
leverage what we already have. The only part that needs to be modified 
is the "expand star" method in SqlValidator and Table API.


Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would show 
$rowtime as the expand star has only a special case when in the scope 
for `FROM t`. All further subqueries treat it as a regular column.


4) Built-in defined pseudo-column "$rowtime"

 > Did you consider making it as a built-in defined pseudo-column 
"$rowtime" which returns the time attribute value (if exists) or null 
(if non-exists) for every table/query, and pseudo-column "$proctime" 
always returns PROCTIME() value for each table/query


Built-in pseudo-columns mean that connector or catalog providers need 
consensus in Flink which pseudo-columns should be built-in. We should 
keep the concept generic and let platform providers decide which pseudo 
columns to expose. $rowtime might be obvious but others such as 
$partition or $offset are tricky to get consensus as every external 
connector works differently. Also a connector might want to expose 
different time semantics (such as ingestion time).


5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation effort 
would be huge and could introduce a lot of bugs.


6) "Metadata Key Prefix Constraint" which is still a little complex

Another option could be to drop the naming pattern constraint. We could 
make it configurable that METADATA VIRTUAL columns are never selected by 
default in SELECT * or visible in DESCRIBE. This would further simplify 
the FLIP and esp lower the impact on the planner and all interfaces.


What do you think about this? We could introduce a flag:

table.expand-metadata-columns (better name to be defined)

This way we don't need to introduce the concept of system columns yet, 
but can still offer similar functionality with minimal overhead in the 
code base.


Regards,
Timo




On 04.08.23 23:06, Alexey Leonov-Vendrovskiy wrote:

Looks like both kinds of system columns can converge.
We can say that any operator can introduce system (psedo) columns.

cc Eugene who is also interested in the subject.

On Wed, Aug 2, 2023 at 1:03 AM Paul Lam  wrote:


Hi Timo,

Thanks for starting the discussion! System columns are no doubt a
good boost on Flink SQL’s usability, and I see the feedbacks are
mainly concerns about the accessibility of system columns.

I think most of the concerns could be solved by clarifying the
ownership of the system columns. Different from databases like
Oracle/BigQuery/PG who owns the data/metadata, Flink uses the
data/metadata from external systems. That means Flink could
have 2 kinds of system columns (take ROWID for example):

1. system columns provided by external systems via catalogs, such
 as ROWID from the original system.
2. system columns generated by Flink, such as ROWID generated by
 Flink itself.

IIUC, the FLIP is proposing the 1st approach: the catalog defines what
system columns to provide, and Flink treats them as normal columns
with a special na

Re: Plans for Schema Evolution in Table API

2023-08-15 Thread Timo Walther

Hi Ashish,

sorry for the late reply. There are currently no concrete plans to 
support schema evolution in Table API. Until recently, Flink version 
evolution was the biggest topic. In the near future we can rediscuss 
query and state evolution in more detail.


Personally, I think we will need either some kind of more flexible data 
type (similar like the JSON type in Postgres) or user-defined types 
(UDT) to ensure a smooth experience.


For now, warming up the state is the only viable solution until internal 
serializers are more flexible.


Regards,
Timo

On 14.08.23 16:55, Ashish Khatkar wrote:

Bumping the thread.

On Fri, Aug 4, 2023 at 12:51 PM Ashish Khatkar  wrote:


Hi all,

We are using flink-1.17.0 table API and RocksDB as backend to provide a
service to our users to run sql queries. The tables are created using the
avro schema and when the schema is changed in a compatible manner i.e
adding a field with default, we are unable to recover the job from the
savepoint. This is mentioned in the flink doc on evolution [1] as well.

Are there any plans to support schema evolution in the table API? Our
current approach involves rebuilding the entire state by discarding the
output and then utilizing that state in the actual job. This is already
done for table-store [2]

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-226%3A+Introduce+Schema+Evolution+on+Table+Store









Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-17 Thread Timo Walther

+1 for this proposal.

Not every data team would like to enable hints. Also because they are an 
extension to the SQL standard. It might also be the case that custom 
rules would be overwritten otherwise. Setting hints could also be the 
exclusive task of a DevOp team.


Regards,
Timo


On 17.08.23 09:30, Konstantin Knauf wrote:

Hi Bonnie,

this makes sense to me, in particular, given that we already have this
toggle for a different type of hints.

Best,

Konstantin

Am Mi., 16. Aug. 2023 um 19:38 Uhr schrieb Bonnie Arogyam Varghese
:


Hi Liu,
  Options hints could be a security concern since users can override
settings. However, query hints specifically could affect performance.
Since we have a config to disable Options hint, I'm suggesting we also have
a config to disable Query hints.

On Wed, Aug 16, 2023 at 9:41 AM liu ron  wrote:


Hi,

Thanks for driving this proposal.

Can you explain why you would need to disable query hints because of
security issues? I don't really understand why query hints affects
security.

Best,
Ron

Bonnie Arogyam Varghese  于2023年8月16日周三
23:59写道:


Platform providers may want to disable hints completely for security
reasons.

Currently, there is a configuration to disable OPTIONS hint -





https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled


However, there is no configuration available to disable QUERY hints -





https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints


The proposal is to add a new configuration:

Name: table.query-options.enabled
Description: Enable or disable the QUERY hint, if disabled, an
exception would be thrown if any QUERY hints are specified
Note: The default value will be set to true.












Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-18 Thread Timo Walther
Great, I also like my last suggestion as it is even more elegant. I will 
update the FLIP until Monday.


Regards,
Timo

On 17.08.23 13:55, Jark Wu wrote:

Hi Timo,

I'm fine with your latest suggestion that introducing a flag to control
expanding behavior of metadata virtual columns, but not introducing
any concept of system/pseudo columns for now.

Best,
Jark

On Tue, 15 Aug 2023 at 23:25, Timo Walther  wrote:


Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to simply
use METADATA VIRTUAL as system columns and only introduce a config
option for the SELECT * behavior. Implementation-wise this means minimal
effort and less new concepts.

Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for the
late reply as I was on vacation. Let me answer to some of the topics:

1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next iteration.

2) Do you know why most SQL systems do not need any prefix with their
pseudo column?

Because most systems do not have external catalogs or connectors. And
also the number of system columns is limited to a handful of columns.
Flink is more generic and thus more complex. And we have already the
concepts of metadata columns. We need to be careful with not overloading
our language.

3) Implementation details

  > how to you plan to implement the "system columns", do we need to add
it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
  > I'm not sure that Calcite's "system column" feature is fully ready

My plan would be to only modify the parsing/validating phase. I would
like to avoid additional complexity in planner rules and
connector/catalog interfaces. Metadata columns already support
projection push down and are passed through the stack (via Schema,
ResolvedSchema, SupportsReadableMetadata). Calcite's "system column"
feature is not fully ready yet and it would be a large effort
potentially introducing bugs in supporting it. Thus, I'm proposing to
leverage what we already have. The only part that needs to be modified
is the "expand star" method in SqlValidator and Table API.

Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would show
$rowtime as the expand star has only a special case when in the scope
for `FROM t`. All further subqueries treat it as a regular column.

4) Built-in defined pseudo-column "$rowtime"

  > Did you consider making it as a built-in defined pseudo-column
"$rowtime" which returns the time attribute value (if exists) or null
(if non-exists) for every table/query, and pseudo-column "$proctime"
always returns PROCTIME() value for each table/query

Built-in pseudo-columns mean that connector or catalog providers need
consensus in Flink which pseudo-columns should be built-in. We should
keep the concept generic and let platform providers decide which pseudo
columns to expose. $rowtime might be obvious but others such as
$partition or $offset are tricky to get consensus as every external
connector works differently. Also a connector might want to expose
different time semantics (such as ingestion time).

5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation effort
would be huge and could introduce a lot of bugs.

6) "Metadata Key Prefix Constraint" which is still a little complex

Another option could be to drop the naming pattern constraint. We could
make it configurable that METADATA VIRTUAL columns are never selected by
default in SELECT * or visible in DESCRIBE. This would further simplify
the FLIP and esp lower the impact on the planner and all interfaces.

What do you think about this? We could introduce a flag:

table.expand-metadata-columns (better name to be defined)

This way we don't need to introduce the concept of system columns yet,
but can still offer similar functionality with minimal overhead in the
code base.

Regards,
Timo




On 04.08.23 23:06, Alexey Leonov-Vendrovskiy wrote:

Looks like both kinds of system columns can converge.
We can say that any operator can introduce system (psedo) columns.

cc Eugene who is also interested in the subject.

On Wed, Aug 2, 2023 at 1:03 AM Paul Lam  wrote:


Hi Timo,

Thanks for starting the discussion! System columns are no doubt a
good boost on Flink SQL’s usability, and I see the feedbacks are
mainly concerns about the accessibility of system columns.

I think most of the concerns could be solved by clarifying the
ownership of the system columns. Different from databases like
Oracle/BigQuery/PG who owns the data/metadata, Flink uses the
data/metadata from external systems. That means Flink could
have 2 kinds of system c

Re: [DISCUSS] [FLINK-32873] Add a config to allow disabling Query hints

2023-08-18 Thread Timo Walther

> lots of the streaming SQL syntax are extensions of SQL standard

That is true. But hints are kind of a special case because they are not 
even "part of Flink SQL" that's why they are written in a comment syntax.


Anyway, I feel hints could be sometimes confusing for users because most 
of them have no effect for streaming and long-term we could also set 
some hints via the CompiledPlan. And if you have multiple teams, 
non-skilled users should not play around with hints and leave the 
decision to the system that might become smarter over time.


Regards,
Timo


On 17.08.23 18:47, liu ron wrote:

Hi, Bonnie


Options hints could be a security concern since users can override

settings.

I think this still doesn't answer my question

Best,
Ron

Jark Wu  于2023年8月17日周四 19:51写道:


Sorry, I still don't understand why we need to disable the query hint.
It doesn't have the security problems as options hint. Bonnie said it
could affect performance, but that depends on users using it explicitly.
If there is any performance problem, users can remove the hint.

If we want to disable query hint just because it's an extension to SQL
standard.
I'm afraid we have to introduce a bunch of configuration, because lots of
the streaming SQL syntax are extensions of SQL standard.

Best,
Jark

On Thu, 17 Aug 2023 at 15:43, Timo Walther  wrote:


+1 for this proposal.

Not every data team would like to enable hints. Also because they are an
extension to the SQL standard. It might also be the case that custom
rules would be overwritten otherwise. Setting hints could also be the
exclusive task of a DevOp team.

Regards,
Timo


On 17.08.23 09:30, Konstantin Knauf wrote:

Hi Bonnie,

this makes sense to me, in particular, given that we already have this
toggle for a different type of hints.

Best,

Konstantin

Am Mi., 16. Aug. 2023 um 19:38 Uhr schrieb Bonnie Arogyam Varghese
:


Hi Liu,
   Options hints could be a security concern since users can override
settings. However, query hints specifically could affect performance.
Since we have a config to disable Options hint, I'm suggesting we also

have

a config to disable Query hints.

On Wed, Aug 16, 2023 at 9:41 AM liu ron  wrote:


Hi,

Thanks for driving this proposal.

Can you explain why you would need to disable query hints because of
security issues? I don't really understand why query hints affects
security.

Best,
Ron

Bonnie Arogyam Varghese 

于2023年8月16日周三

23:59写道:


Platform providers may want to disable hints completely for security
reasons.

Currently, there is a configuration to disable OPTIONS hint -









https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/#table-dynamic-table-options-enabled


However, there is no configuration available to disable QUERY hints

-










https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/hints/#query-hints


The proposal is to add a new configuration:

Name: table.query-options.enabled
Description: Enable or disable the QUERY hint, if disabled, an
exception would be thrown if any QUERY hints are specified
Note: The default value will be set to true.



















Re: [DISCUSS] FLIP-348: Support System Columns in SQL and Table API

2023-08-29 Thread Timo Walther

Thanks everyone for the positive feedback.

I updated the FLIP with the proposed minimal solution:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-348%3A+Make+expanding+behavior+of+virtual+metadata+columns+configurable

I also changed the title to not cause any confusion with the concept of 
system columns.


Now the FLIP only introduces a new ConfigOption and does not specify 
constraints on the naming of metadata columns anymore.


If there are no objections, I would start a voting thread by tomorrow.

Thanks,
Timo


On 29.08.23 00:21, Alexey Leonov-Vendrovskiy wrote:

SGTM

For future reference, responding here:

5) Any operator can introduce system (psedo) columns.


This is clearly out of scope for this FLIP. The implementation effort
would be huge and could introduce a lot of bugs.



  I didn't imply any specific implementation or feature coverage in the
currently proposed FLIP, but rather as a way to describe the semantics of
system columns that those could be (but don't have to) introduced by any
operator.

Thanks,
Alexey


On Tue, Aug 22, 2023 at 2:18 PM Jing Ge  wrote:


Hi Timo,

Your last suggestion sounds good.

Best regards,
Jing

On Mon, Aug 21, 2023 at 4:21 AM Benchao Li  wrote:


It sounds good to me too, that we avoid introducing the concept of

"system

columns" for now.

Timo Walther  于2023年8月18日周五 22:38写道:


Great, I also like my last suggestion as it is even more elegant. I

will

update the FLIP until Monday.

Regards,
Timo

On 17.08.23 13:55, Jark Wu wrote:

Hi Timo,

I'm fine with your latest suggestion that introducing a flag to

control

expanding behavior of metadata virtual columns, but not introducing
any concept of system/pseudo columns for now.

Best,
Jark

On Tue, 15 Aug 2023 at 23:25, Timo Walther 

wrote:



Hi everyone,

I would like to bump this thread up again.

Esp. I would like to hear opinions on my latest suggestions to

simply

use METADATA VIRTUAL as system columns and only introduce a config
option for the SELECT * behavior. Implementation-wise this means

minimal

effort and less new concepts.

Looking forward to any kind of feedback.

Thanks,
Timo

On 07.08.23 12:07, Timo Walther wrote:

Hi everyone,

thanks for the various feedback and lively discussion. Sorry, for

the

late reply as I was on vacation. Let me answer to some of the

topics:


1) Systems columns should not be shown with DESCRIBE statements

This sounds fine to me. I will update the FLIP in the next

iteration.


2) Do you know why most SQL systems do not need any prefix with

their

pseudo column?

Because most systems do not have external catalogs or connectors.

And

also the number of system columns is limited to a handful of

columns.

Flink is more generic and thus more complex. And we have already

the

concepts of metadata columns. We need to be careful with not

overloading

our language.

3) Implementation details

   > how to you plan to implement the "system columns", do we need

to

add

it to `RelNode` level? Or we just need to do it in the
parsing/validating phase?
   > I'm not sure that Calcite's "system column" feature is fully

ready


My plan would be to only modify the parsing/validating phase. I

would

like to avoid additional complexity in planner rules and
connector/catalog interfaces. Metadata columns already support
projection push down and are passed through the stack (via Schema,
ResolvedSchema, SupportsReadableMetadata). Calcite's "system

column"

feature is not fully ready yet and it would be a large effort
potentially introducing bugs in supporting it. Thus, I'm proposing

to

leverage what we already have. The only part that needs to be

modified

is the "expand star" method in SqlValidator and Table API.

Queries such as `SELECT * FROM (SELECT $rowtime, * FROM t);` would

show

$rowtime as the expand star has only a special case when in the

scope

for `FROM t`. All further subqueries treat it as a regular column.

4) Built-in defined pseudo-column "$rowtime"

   > Did you consider making it as a built-in defined pseudo-column
"$rowtime" which returns the time attribute value (if exists) or

null

(if non-exists) for every table/query, and pseudo-column

"$proctime"

always returns PROCTIME() value for each table/query

Built-in pseudo-columns mean that connector or catalog providers

need

consensus in Flink which pseudo-columns should be built-in. We

should

keep the concept generic and let platform providers decide which

pseudo

columns to expose. $rowtime might be obvious but others such as
$partition or $offset are tricky to get consensus as every external
connector works differently. Also a connector might want to expose
different time semantics (such as ingestion time).

5) Any operator can introduce system (psedo) columns.

This is clearly out of scope for this FLIP. The implementation

effort

would be huge and cou

[VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-08-31 Thread Timo Walther

Hi everyone,

I'd like to start a vote on FLIP-348: Make expanding behavior of virtual 
metadata columns configurable [1] which has been discussed in this 
thread [2].


The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.


[1] https://cwiki.apache.org/confluence/x/_o6zDw
[2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy

Cheers,
Timo


[RESULT][VOTE] FLIP-348: Make expanding behavior of virtual metadata columns configurable

2023-09-05 Thread Timo Walther

Hi everyone,

The voting time for [VOTE] FLIP-348: Make expanding behavior of virtual 
metadata columns configurable[1] has passed. I'm closing the vote now.


There were 6 +1 votes, all were binding:

- Martijn Visser (binding)
- Benchao Li (binding)
- Godfrey He (binding)
- Sergey Nuyanzin (binding)
- Jing Ge (binding)
- Jark Wu (binding)

There were no -1 votes.

Thus, FLIP-348 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] https://lists.apache.org/thread/lc2h6xpk25bmvm4c5pgtfggm920j2ckr

Cheers,
Timo



On 31.08.23 14:29, Jark Wu wrote:

+1 (binding)

Best,
Jark


2023年8月31日 18:54,Jing Ge  写道:

+1(binding)

On Thu, Aug 31, 2023 at 11:22 AM Sergey Nuyanzin 
wrote:


+1 (binding)

On Thu, Aug 31, 2023 at 9:28 AM Benchao Li  wrote:


+1 (binding)

Martijn Visser  于2023年8月31日周四 15:24写道:


+1 (binding)

On Thu, Aug 31, 2023 at 9:09 AM Timo Walther 

wrote:



Hi everyone,

I'd like to start a vote on FLIP-348: Make expanding behavior of

virtual

metadata columns configurable [1] which has been discussed in this
thread [2].

The vote will be open for at least 72 hours unless there is an

objection

or not enough votes.

[1] https://cwiki.apache.org/confluence/x/_o6zDw
[2] https://lists.apache.org/thread/zon967w7synby8z6m1s7dj71dhkh9ccy

Cheers,
Timo





--

Best,
Benchao Li




--
Best regards,
Sergey







[DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-26 Thread Timo Walther

Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY 
clause [1].


Many SQL vendors expose the concepts of Partitioning, Bucketing, and 
Clustering. This FLIP continues the work of previous FLIPs and would 
like to introduce the concept of "Bucketing" to Flink.


This is a pure connector characteristic and helps both Apache Kafka and 
Apache Paimon connectors in avoiding a complex WITH clause by providing 
improved syntax.


Here is an example:

CREATE TABLE MyTable
  (
uid BIGINT,
name STRING
  )
  DISTRIBUTED BY (uid) INTO 6 BUCKETS
  WITH (
'connector' = 'kafka'
  )

The full syntax specification can be found in the document. The clause 
should be optional and fully backwards compatible.


Regards,
Timo

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause


Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause







Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Yunfan and Benchao,

it seems the FLIP discussion thread got split into two parts. At least 
this is what I see in my mail program. I would kindly ask to answer in 
the other thread [1].


I will also reply there now to maintain the discussion link.

Regards,
Timo

[1] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1



On 28.10.23 10:34, Benchao Li wrote:

Thanks Timo for preparing the FLIP.

Regarding "By default, DISTRIBUTED BY assumes a list of columns for an
implicit hash partitioning."
Do you think it's useful to add some extensibility for the hash
strategy. One scenario I can foresee is if we write bucketed data into
Hive, and if Flink's hash strategy is different than Hive/Spark's,
then they could not utilize the bucketed data written by Flink. This
is the one case I met in production already, there may be more cases
like this that needs customize the hash strategy to accommodate with
existing systems.

yunfan zhang  于2023年10月27日周五 19:06写道:


Distribute by in DML is also supported by Hive.
And it is also useful for flink.
Users can use this ability to increase cache hit rate in lookup join.
And users can use "distribute by key, rand(1, 10)” to avoid data skew problem.
And I think it is another way to solve this Flip204[1]
There is already has some people required this feature[2]

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
[2] https://issues.apache.org/jira/browse/FLINK-27541

On 2023/10/27 08:20:25 Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
(
  uid BIGINT,
  name STRING
)
DISTRIBUTED BY (uid) INTO 6 BUCKETS
WITH (
  'connector' = 'kafka'
)

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause











Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This 
discussion is about DDL. For DDL, we have more freedom as every vendor 
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly 
connector to the connector implementation, not the engine. However, for 
DML we need to watch out for standard compliance and introduce changes 
with high caution.


How a LookupTableSource interprets the DISTRIBUTED BY is 
connector-dependent in my opinion. In general this FLIP is a sink 
ability, but we could have a follow FLIP that helps in distributing load 
of lookup joins.


> to avoid data skew problem

I understand the use case and that it is important to solve it 
eventually. Maybe a solution might be to introduce helper Polymorphic 
Table Functions [1] in the future instead of new syntax.


[1] 
https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink 
SQL engine. We are not using Flink's hash strategy in any way. If the 
hash strategy for the regular Flink file system connector should be 
changed, this should be expressed via config option. Otherwise we should 
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.


Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In 
the past years, we could enable use cases also without this clause, so 
we should be careful with overloading it with too functionality in the 
first version. We can still iterate on it later, the interfaces are 
flexible enough to support more in the future.


I agree that maybe an explicit HASH and RANGE doesn't harm. Also making 
the bucket number optional.


I updated the FLIP accordingly. Now the SupportsBucketing interface 
declares more methods that help in validation and proving helpful error 
messages to users.


Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but 
not in

the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER TABLE?
Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example, 
StarRocks[1]

and Paimon[2].

Best,
Jark

[1]:
https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets
[2]:
https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket

On Thu, 26 Oct 2023 at 18:26, Jingsong Li  wrote:


Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther  wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
clause [1].

Many SQL vendors expose the concepts of Partitioning, Bucketing, and
Clustering. This FLIP continues the work of previous FLIPs and would
like to introduce the concept of "Bucketing" to Flink.

This is a pure connector characteristic and helps both Apache Kafka and
Apache Paimon connectors in avoiding a complex WITH clause by providing
improved syntax.

Here is an example:

CREATE TABLE MyTable
    (
  uid BIGINT,
  name STRING
    )
    DISTRIBUTED BY (uid) INTO 6 BUCKETS
    WITH (
  'connector' = 'kafka'
    )

The full syntax specification can be found in the document. The clause
should be optional and fully backwards compatible.

Regards,
Timo

[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause









Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-30 Thread Timo Walther

Hi Jing,

> Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this 
concept "distribution".


In any case, the "BY" is optional, so certain DDL statements would 
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED, 
we should use the passive voice.


> Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning 
strategies (namely hash and range partitioning) if the connector offers 
more than one.


Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order to
make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao mentioned
which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if I
am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name STRING)
DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

- For advanced users, the algorithm can be defined explicitly.
- Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther  wrote:


Let me reply to the feedback from Yunfan:

  > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

  > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]

https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf


Let me reply to the feedback from Benchao:

  > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some minor comments.

1. Could we also support HASH and RANGE distribution kind on the DDL
syntax?
I noticed that HASH and UNKNOWN are introduced in the Java API, but
not in
the syntax.

2. Can we make "INTO n BUCKETS" optional in CREATE TABLE and ALTER

TABLE?

Some storage engines support automatically determining the bucket number
based on
the cluster resources and data size of the table. For example,
StarRocks[1]
and Paimon[2].

Best,
Jark

[1]:


https://docs.starrocks.io/en-us/latest/table_design/Data_distribution#determine-the-number-of-buckets

[2]:


https://paimon.apache.org/docs/0.5/concepts/primary-key-table/#dynamic-bucket


On Thu, 26 Oct 2023 at 18:26, Jingsong Li 

wrote:



Very thanks Timo for starting this discussion.

Big +1 for this.

The design looks good to me!

We can add some documentation for connector developers. For example:
for sink, If there needs some keyby, please finish the keyby by the
connector itself. SupportsBucketing is just a marker interface.

Best,
Jingsong

On Thu, Oct 26, 2023 at 5:00 PM Timo Walther 

wrote:


Hi everyone,

I would like to start a discussion on FLIP-376: Add DISTRIBUTED BY
cl

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-10-31 Thread Timo Walther

Hi Jark,

here are the checks I had in mind so far. But we can also discuss this 
during the implementation in the PRs. Most of the tasks are very similar 
to PARTITIONED BY which is also a characteristic of a sink.


1) Check that DISTRIBUTED BY columns reference physical columns and at 
least 1. In DefaultSchemaResolver like we do for PARTITIONED BY.
2) Check that if DISTRIBUTED is defined the sink implements 
SupportsBucketing. In DynamicSinkUtils like we do for metadata columns.


Currently, for sources we would only check for semantical correctness 
(1) but not more. Like we do for PARTITIONED BY.


Do you have more checks in mind? Of course, during implementation I will 
make sure that all derived utils will properly work; including CREATE 
TABLE LIKE.


Regards,
Timo


On 31.10.23 07:22, Jark Wu wrote:

Hi Timo,

Thank you for the update. The FLIP looks good to me now.
I only have one more question.

What does Flink check and throw exceptions for the bucketing?
For example, do we check interfaces when executing create/alter
DDL and when used as a source?

Best,
Jark

On Tue, 31 Oct 2023 at 00:25, Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only have some m

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-02 Thread Timo Walther

Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the 
programmatic API. But anyway, we should discuss the SQL semantics here. 
It's like a "WHERE" is called "filter" in the programmatic world. Or a 
"SELECT" is called "projection" in code.


And looking at all the Hive tutorials[1], distributed by should be more 
consistent. By using the "INTO n BUCKETS", we still include the 
bucketing terminology in the syntax for better understanding.


If there are no other objections to this topic, I would still prefer to 
go with DISTRIBUTED BY.


Regards,
Timo

[1] 
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/ 




On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

  > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

  > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

 - For advanced users, the algorithm can be defined explicitly.
 - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

   > Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

   > to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

   > Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the bucket number optional.

I updated the FLIP accordingly. Now the SupportsBucketing interface
declares more methods that help in validation and proving helpful error
messages to users.

Let me know what you think.

Regards,
Timo


On 27.10.23 10:20, Jark Wu wrote:

Hi Timo,

Thanks for starting this discussion. I really like it!
The FLIP is already in good shape, I only ha

Re: [DISCUSS] FLIP-376: Add DISTRIBUTED BY clause

2023-11-03 Thread Timo Walther

If there are no objections, I would start with a voting on Monday.

Thanks for the feedback everyone!

Regards,
Timo


On 02.11.23 13:49, Martijn Visser wrote:

Hi all,


From a user point of view, I think it makes sense to go for

DISTRIBUTED BY with how Timo explained it. +1 for his proposal

Best regards,

Martijn


On Thu, Nov 2, 2023 at 11:00 AM Timo Walther  wrote:


Hi Jing,

I agree this is confusing. THe Spark API calls it bucketBy in the
programmatic API. But anyway, we should discuss the SQL semantics here.
It's like a "WHERE" is called "filter" in the programmatic world. Or a
"SELECT" is called "projection" in code.

And looking at all the Hive tutorials[1], distributed by should be more
consistent. By using the "INTO n BUCKETS", we still include the
bucketing terminology in the syntax for better understanding.

If there are no other objections to this topic, I would still prefer to
go with DISTRIBUTED BY.

Regards,
Timo

[1]
https://www.linkedin.com/pulse/hive-order-sort-distribute-cluster-mohammad-younus-jameel/



On 01.11.23 11:55, Jing Ge wrote:

Hi Timo,

Gotcha, let's use passive verbs. I am actually thinking about "BUCKETED BY
6" or "BUCKETED INTO 6".

Not really used in SQL, but afaiu Spark uses the concept[1].

[1]
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrameWriter.bucketBy.html


Best regards,
Jing

On Mon, Oct 30, 2023 at 5:25 PM Timo Walther  wrote:


Hi Jing,

   > Have you considered using BUCKET BY directly?

Which vendor uses this syntax? Most vendors that I checked call this
concept "distribution".

In any case, the "BY" is optional, so certain DDL statements would
declare it like "BUCKET INTO 6 BUCKETS"? And following the PARTITIONED,
we should use the passive voice.

   > Did you mean users can use their own algorithm? How to do it?

"own algorithm" only refers to deciding between a list of partitioning
strategies (namely hash and range partitioning) if the connector offers
more than one.

Regards,
Timo


On 30.10.23 12:39, Jing Ge wrote:

Hi Timo,

The FLIP looks great! Thanks for bringing it to our attention! In order

to

make sure we are on the same page, I would ask some questions:

1. DISTRIBUTED BY reminds me DISTRIBUTE BY from Hive like Benchao

mentioned

which is used to distribute rows amond reducers, i.e. focusing on the
shuffle during the computation. The FLIP is focusing more on storage, if

I

am not mistaken. Have you considered using BUCKET BY directly?

2. According to the FLIP: " CREATE TABLE MyTable (uid BIGINT, name

STRING)

DISTRIBUTED BY HASH(uid) INTO 6 BUCKETS

  - For advanced users, the algorithm can be defined explicitly.
  - Currently, either HASH() or RANGE().

"
Did you mean users can use their own algorithm? How to do it?

Best regards,
Jing

On Mon, Oct 30, 2023 at 11:13 AM Timo Walther 

wrote:



Let me reply to the feedback from Yunfan:

> Distribute by in DML is also supported by Hive

I see DISTRIBUTED BY and DISTRIBUTE BY as two separate discussions. This
discussion is about DDL. For DDL, we have more freedom as every vendor
has custom syntax for CREATE TABLE clauses. Furthermore, this is tightly
connector to the connector implementation, not the engine. However, for
DML we need to watch out for standard compliance and introduce changes
with high caution.

How a LookupTableSource interprets the DISTRIBUTED BY is
connector-dependent in my opinion. In general this FLIP is a sink
ability, but we could have a follow FLIP that helps in distributing load
of lookup joins.

> to avoid data skew problem

I understand the use case and that it is important to solve it
eventually. Maybe a solution might be to introduce helper Polymorphic
Table Functions [1] in the future instead of new syntax.

[1]



https://www.ifis.uni-luebeck.de/~moeller/Lectures/WS-19-20/NDBDM/12-Literature/Michels-SQL-2016.pdf



Let me reply to the feedback from Benchao:

> Do you think it's useful to add some extensibility for the hash
strategy

The hash strategy is fully determined by the connector, not the Flink
SQL engine. We are not using Flink's hash strategy in any way. If the
hash strategy for the regular Flink file system connector should be
changed, this should be expressed via config option. Otherwise we should
offer a dedicated `hive-filesystem` or `spark-filesystem` connector.

Regards,
Timo


On 30.10.23 10:44, Timo Walther wrote:

Hi Jark,

my intention was to avoid too complex syntax in the first version. In
the past years, we could enable use cases also without this clause, so
we should be careful with overloading it with too functionality in the
first version. We can still iterate on it later, the interfaces are
flexible enough to support more in the future.

I agree that maybe an explicit HASH and RANGE doesn't harm. Also making
the buck

[VOTE] FLIP-376: Add DISTRIBUTED BY clause

2023-11-06 Thread Timo Walther

Hi everyone,

I'd like to start a vote on FLIP-376: Add DISTRIBUTED BY clause[1] which 
has been discussed in this thread [2].


The vote will be open for at least 72 hours unless there is an objection 
or not enough votes.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause

[2] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1

Cheers,
Timo


Re: FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter

2023-11-07 Thread Timo Walther

Thanks for the FLIP, Piotr.

In order to follow the FLIP process[1], please prefix the email subject 
with "[DISCUSS]".


Also, some people might have added filters to their email clients to 
highlight those discussions.


Thanks,
Timo

[1] 
https://cwiki.apache.org/confluence/display/Flink/Flink+Improvement+Proposals#FlinkImprovementProposals-Process


On 07.11.23 09:35, Piotr Nowojski wrote:

Hi all!

I would like to start a discussion on a follow up of FLIP-384: Introduce
TraceReporter and use it to create checkpointing and recovery traces [1]:

*FLIP-385: Add OpenTelemetryTraceReporter and OpenTelemetryMetricReporter[
2]*

This FLIP proposes to add both MetricReporter and TraceReporter integrating
Flink with OpenTelemetry [4].

There is also another follow up FLIP-386 [3], which improves recovery
traces.

Please let me know what you think!

Best,
Piotr Nowojski

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-384%3A+Introduce+TraceReporter+and+use+it+to+create+checkpointing+and+recovery+traces
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-385%3A+Add+OpenTelemetryTraceReporter+and+OpenTelemetryMetricReporter
[3]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-386%3A+Support+adding+custom+metrics+in+Recovery+Spans
[4] https://opentelemetry.io/





[RESULT][VOTE] FLIP-376: Add DISTRIBUTED BY clause

2023-11-09 Thread Timo Walther

Hi everyone,

The voting time for [VOTE] FLIP-376: Add DISTRIBUTED BY clause[1] has 
passed. I'm closing the vote now.


There were 10 +1 votes, of which 8 were binding:

- Jing Ge (binding)
- Martijn Visser (binding)
- Lincoln Lee (binding)
- Benchao Li (binding)
- Dawid Wysakowicz (binding)
- Jim Hughes (non-binding)
- Jingsong Li (binding)
- Zhanghao Chen (non-binding)
- Sergey Nuyanzin (binding)
- Leonard Xu (binding)

There were no -1 votes.

Thus, FLIP-376 has been accepted.

Thanks everyone for joining the discussion and giving feedback!

[1] https://lists.apache.org/thread/cft2d9jc2lr8gv6dyyz7b62188mf07sj

Cheers,
Timo


On 08.11.23 10:14, Leonard Xu wrote:

+1(binding)

Best,
Leonard


2023年11月8日 下午1:05,Sergey Nuyanzin  写道:

+1 (binding)

On Wed, Nov 8, 2023 at 6:02 AM Zhanghao Chen 
wrote:


+1 (non-binding)

Best,
Zhanghao Chen

From: Timo Walther 
Sent: Monday, November 6, 2023 19:38
To: dev 
Subject: [VOTE] FLIP-376: Add DISTRIBUTED BY clause

Hi everyone,

I'd like to start a vote on FLIP-376: Add DISTRIBUTED BY clause[1] which
has been discussed in this thread [2].

The vote will be open for at least 72 hours unless there is an objection
or not enough votes.

[1]

https://cwiki.apache.org/confluence/display/FLINK/FLIP-376%3A+Add+DISTRIBUTED+BY+clause
[2] https://lists.apache.org/thread/z1p4f38x9ohv29y4nlq8g1wdxwfjwxd1

Cheers,
Timo




--
Best regards,
Sergey






Re: [VOTE] FLIP-393: Make QueryOperations SQL serializable

2023-11-21 Thread Timo Walther

+1 (binding)

Thanks for working on this FLIP. It will be a nice continuation of the 
previous work.


Cheers,
Timo


On 21.11.23 13:19, Gyula Fóra wrote:

+1 (binding)

Gyula

On Tue, 21 Nov 2023 at 13:11, xiangyu feng  wrote:


+1 (non-binding)

Thanks for driving this.

Best,
Xiangyu Feng


Ferenc Csaky  于2023年11月21日周二 20:07写道:


+1 (non-binding)

Lookgin forward to this!

Best,
Ferenc




On Tuesday, November 21st, 2023 at 12:21, Martijn Visser <
martijnvis...@apache.org> wrote:





+1 (binding)

Thanks for driving this.

Best regards,

Martijn

On Tue, Nov 21, 2023 at 12:18 PM Benchao Li libenc...@apache.org

wrote:



+1 (binding)

Dawid Wysakowicz wysakowicz.da...@gmail.com 于2023年11月21日周二 18:56写道:


Hi everyone,

Thank you to everyone for the feedback on FLIP-393: Make

QueryOperations

SQL serializable[1]
which has been discussed in this thread [2].

I would like to start a vote for it. The vote will be open for at

least 72

hours unless there is an objection or not enough votes.

[1] https://cwiki.apache.org/confluence/x/vQ2ZE
[2]

https://lists.apache.org/thread/ztyk68brsbmwwo66o1nvk3f6fqqhdxgk


--

Best,
Benchao Li










Re: SQL return type change from 1.17 to 1.18

2023-12-07 Thread Timo Walther

Hi Peter,

thanks for reaching out to the Flink community. This is indeed a serious 
issue. As the author of the Flink type system, DataType and many related 
utilities I strongly vote for reverting FLINK-33523:


- It changes the Flink type system without a FLIP.
- It breaks backwards compatibility with UDFs and connectors.

Regards,
Timo

On 07.12.23 07:38, Péter Váry wrote:

Hi Team,

We are working on upgrading the Iceberg-Flink connector from 1.17 to 1.18,
and found that some of our tests are failing. Prabhu Joseph created a jira
[1] to discuss this issue, along with short example code.

In a nutshell:
- Create a table with an 'ARRAY' column
- Run a select which returns this column
- The return type changes:
 - From 'Object[]' - in 1.17
 - To 'int[]' - in 1.18

The change is introduced by this jira [2].

While I understand the reasoning behind this change, this will break some
users existing workflow as evidenced by Xingcan Cui finding this
independently [3].

What is the opinion of the community about this change?
- Do we want to revert the change?
- Do we ask the owners of the change to make this behavior configurable?
- Do we accept this behavior change in a minor release?

Thanks,
Peter

[1] - https://issues.apache.org/jira/browse/FLINK-33523 - DataType
ARRAY fails to cast into Object[]
[2] - https://issues.apache.org/jira/browse/FLINK-31835 - DataTypeHint
don't support Row>
[3] - https://issues.apache.org/jira/browse/FLINK-33547 - SQL primitive
array type after upgrading to Flink 1.18.0





Re: [DISCUSS] Release Flink 1.18.1

2023-12-08 Thread Timo Walther

Thanks for taking care of this Jing.

+1 to release 1.18.1 for this.

Cheers,
Timo


On 08.12.23 10:00, Benchao Li wrote:

I've merged FLINK-33313 to release-1.18 branch.

Péter Váry  于2023年12月8日周五 16:56写道:


Hi Jing,
Thanks for taking care of this!
+1 (non-binding)
Peter

Sergey Nuyanzin  ezt írta (időpont: 2023. dec. 8., P,
9:36):


Thanks Jing driving it
+1

also +1 to include FLINK-33313 mentioned by Benchao Li

On Fri, Dec 8, 2023 at 9:17 AM Benchao Li  wrote:


Thanks Jing for driving 1.18.1 releasing.

I would like to include FLINK-33313[1] in 1.18.1, it's just a bugfix,
not a blocker, but it's already merged into master, I plan to merge it
to 1.8/1.7 branches today after the CI passes.

[1] https://issues.apache.org/jira/browse/FLINK-33313

Jing Ge  于2023年12月8日周五 16:06写道:


Hi all,

I would like to discuss creating a new 1.18 patch release (1.18.1). The
last 1.18 release is nearly two months old, and since then, 37 tickets

have

been closed [1], of which 6 are blocker/critical [2].  Some of them are
quite important, such as FLINK-33598 [3]

Most urgent and important one is FLINK-33523 [4] and according to the
discussion thread[5] on the ML, 1.18.1 should/must be released asap

after

the breaking change commit has been reverted.

I am not aware of any other unresolved blockers and there are no

in-progress

tickets [6].
Please let me know if there are any issues you'd like to be included in
this release but still not merged.

If the community agrees to create this new patch release, I could
volunteer as the release manager.

Best regards,
Jing

[1]




https://issues.apache.org/jira/browse/FLINK-33567?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.18.1%20%20and%20resolution%20%20!%3D%20%20Unresolved%20order%20by%20priority%20DESC

[2]




https://issues.apache.org/jira/browse/FLINK-33693?jql=project%20%3D%20FLINK%20AND%20fixVersion%20%3D%201.18.1%20and%20resolution%20%20!%3D%20Unresolved%20%20and%20priority%20in%20(Blocker%2C%20Critical)%20ORDER%20by%20priority%20%20DESC

[3] https://issues.apache.org/jira/browse/FLINK-33598
[4] https://issues.apache.org/jira/browse/FLINK-33523
[5] https://lists.apache.org/thread/m4c879y8mb7hbn2kkjh9h3d8g1jphh3j
[6] https://issues.apache.org/jira/projects/FLINK/versions/12353640
Thanks,




--

Best,
Benchao Li




--
Best regards,
Sergey









Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-12 Thread Timo Walther

Hi Xuyang,

thanks for proposing this FLIP. In my opinion the FLIP touches too many 
topics at the same time and should be split into multiple FLIPs. We 
should stay focused on what is needed for Flink 2.0.


Let me summarizing the topics:

1) SESSION Window TVF Aggregation

This has been agreed in FLIP-145 already. We don't need another FLIP but 
someone that finally implements this after we have performed the Calcite 
upgrade a couple of months ago. The Calcite upgrade was important 
exactly for SESSION windows. And actually Sergey Nuyanzin wanted to work 
in this if I remember correctly. Not sure if this is still the case.


2) CDC support of Window TVFs

This can be a FLIP on its own.

3) HOP window size with non-integer step length

This can be a FLIP on its own.

4) Configurations such as early fire, late fire and allow lateness

Can we postpone this discussion? Currently we should focus on user 
switching to Window TVFs before Flink 2.0. Early fire, late fire and 
allow lateness have not exposed through public configuration. It can be 
introduced after Flink 2.0 released.


Regards,
Timo


On 12.12.23 08:01, Xuyang wrote:

Hi, Jim.
Thanks for your explaination.

Ah, I mean to ask if you can contribute the new SESSION Table support
without needing FLIP-392 completely settled.  I was trying to see if that
is separate work which can be done or if there is some dependency on this
FLIP.

The pr available about session window tvf belongs to this flip I think, and is 
part of the work about this flip. Actually the poc pr is not ready completely 
yet,
I'll try to update it to implement the session window tvf in window agg 
operator instead of using legacy group window agg operator.

The tests should not be impacted.  Depending on what order our work lands
in, one of the tests you've added/updated would likely move to the
RestoreTests that Bonnie and I are working on.  Just mentioning that ahead
of time


Got it! I will pay attention to it.




--

 Best!
 Xuyang





在 2023-12-11 21:35:00,"Jim Hughes"  写道:

Hi Xuyang,

On Sun, Dec 10, 2023 at 10:41 PM Xuyang  wrote:


Hi, Jim.

As a clarification, since FLINK-24204 is finishing up work from
FLIP-145[1], do we need to discuss anything before you work out the

details

of FLINK-24024 as a PR?

Which issue do you mean? It seems that FLINK-24204[1] is the issue with
table api&sql type system.



Ah, I mean to ask if you can contribute the new SESSION Table support
without needing FLIP-392 completely settled.  I was trying to see if that
is separate work which can be done or if there is some dependency on this
FLIP.



I've got a PR up [3] for moving at least one of the classes you are
touching.
Nice work! Since we are not going to delete the legacy group window agg
operator actually, the only compatibility issue
may be that when using flink sql, the legacy group window agg operator
will be rewritten into new operators. Will these tests be affected about
this rewritten?



The tests should not be impacted.  Depending on what order our work lands
in, one of the tests you've added/updated would likely move to the
RestoreTests that Bonnie and I are working on.  Just mentioning that ahead
of time

Cheers,

Jim





[1] https://issues.apache.org/jira/browse/FLINK-24204






--

 Best!
 Xuyang





At 2023-12-09 06:25:30, "Jim Hughes"  wrote:

Hi Xuyang,

As a clarification, since FLINK-24204 is finishing up work from
FLIP-145[1], do we need to discuss anything before you work out the

details

of FLINK-24024 as a PR?

Relatedly, as that goes up for a PR, as part of FLINK-33421 [2], Bonnie

and

I are working through migrating some of the JsonPlan Tests and ITCases to
RestoreTests.  I've got a PR up [3] for moving at least one of the classes
you are touching.  Let me know if I can share any details about that work.

Cheers,

Jim

1.


https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function#FLIP145:SupportSQLwindowingtablevaluedfunction-SessionWindows


2. https://issues.apache.org/jira/browse/FLINK-33421
3. https://github.com/apache/flink/pull/23886
https://issues.apache.org/jira/browse/FLINK-33676

On Tue, Nov 28, 2023 at 7:31 AM Xuyang  wrote:


Hi all.
I'd like to start a discussion of FLIP-392: Deprecate the Legacy Group
Window Aggregation.


Although the current Flink SQL Window Aggregation documentation[1]
indicates that the legacy Group Window Aggregation
syntax has been deprecated, the new Window TVF Aggregation syntax has

not

fully covered all of the features of the legacy one.


Compared to Group Window Aggergation, Window TVF Aggergation has several
advantages, such as two-stage optimization,
support for standard GROUPING SET syntax, and so on. However, it needs

to

supplement and enrich the following features.


1. Support for SESSION Window TVF Aggregation
2. Support for consuming CDC stream
3. Support for HOP window size with non-integer step length
4. Support for configurations such 

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-14 Thread Timo Walther

Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has 
been requested multiple times. It will be in particular interesting for 
accessing REST endpoints and other remote services.


Great that we can generalize and reuse parts of the Python planner rules 
and code for this.


I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space 
should remain constant. Only a constant key space enables the use of the 
ConfigOption class which is required in the layered configuration. For 
now I would suggest to only allow a global setting for buffer capacity, 
timeout, and retry-strategy. We can later work on a per-function 
configuration (potentially also needed for other use cases).


2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined 
per-function. It impacts the query result and potentially the behavior 
of planner rules.


I see two options for this either: (a) an additional method in 
AsyncScalarFunction or (b) adding this to the function's requirements. I 
vote for (b), because a FunctionDefinition should be fully self 
contained and sufficient for planning.


Thus, for `FunctionDefinition.getRequirements(): 
Set` we can add a new requirement `ORDERED` which 
should also be the default for AsyncScalarFunction. `getRequirements()` 
can be overwritten and return a set without this requirement if the user 
intents to do this.



Thanks,
Timo




On 11.12.23 18:43, Piotr Nowojski wrote:

+1 to the idea, I don't have any comments.

Best,
Piotrek

czw., 7 gru 2023 o 07:15 Alan Sheinberg 
napisał(a):



Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)


Thanks.  That's fair.  I agree that "Remote" isn't always accurate.  I
believe that the python calls are also done asynchronously, so that might
be a reasonable name, so long as there's no confusion between the base and
async child class.

On Wed, Dec 6, 2023 at 3:48 PM Jim Hughes 
wrote:


Hi Alan,

Nicely written and makes sense.  The only feedback I have is around the
naming of the generalization, e.g. "Specifically, PythonCalcSplitRuleBase
will be generalized into RemoteCalcSplitRuleBase."  This naming seems to
imply/suggest that all Async functions are remote.  I wonder if we can

find

another name which doesn't carry that connotation; maybe
AsyncCalcSplitRuleBase.  (An AsyncCalcSplitRuleBase which handles Python
and Async functions seems reasonable.)

Cheers,

Jim

On Wed, Dec 6, 2023 at 5:45 PM Alan Sheinberg
 wrote:


I'd like to start a discussion of FLIP-400: AsyncScalarFunction for
asynchronous scalar function support [1]

This feature proposes adding a new UDF type AsyncScalarFunction which

is

invoked just like a normal ScalarFunction, but is implemented with an
asynchronous eval method.  I had brought this up including the

motivation

in a previous discussion thread [2].

The purpose is to achieve high throughput scalar function UDFs while
allowing that an individual call may have high latency.  It allows

scaling

up the parallelism of just these calls without having to increase the
parallelism of the whole query (which could be rather resource
inefficient).

In practice, it should enable SQL integration with external services

and

systems, which Flink has limited support for at the moment. It should

also

allow easier integration with existing libraries which use asynchronous
APIs.

Looking forward to your feedback and suggestions.

[1]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support

<




https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support




[2] https://lists.apache.org/thread/bn153gmcobr41x2nwgodvmltlk810hzs


Thanks,
Alan











Re: [DISCUSS] Should Configuration support getting value based on String key?

2023-12-14 Thread Timo Walther
The configuration in Flink is complicated and I fear we won't have 
enough capacity to substantially fix it. The introduction of 
ReadableConfig, WritableConfig, and typed ConfigOptions was a right step 
into making the code more maintainable. From the Flink side, every read 
access should go through ConfigOption.


However, I also understand Gyula pragmatism here because (practically 
speaking) users get access `getString()` via `toMap().get()`. So I'm 
fine with removing the deprecation for functionality that is available 
anyways. We should, however, add the message to `getString()` that this 
method is discouraged and `get(ConfigOption)` should be the preferred 
way of accessting Configuration.


In any case we should remove the getInt and related methods.

Cheers,
Timo


On 14.12.23 09:56, Gyula Fóra wrote:

I see a strong value for user facing configs to use ConfigOption and this
should definitely be an enforced convention.

However with the Flink project growing and many other components and even
users using the Configuration object I really don’t think that we should
“force” this on the users/developers.

If we make fromMap / toMap free with basically no overhead, that is fine
but otherwise I think it would hurt the user experience to remove the
simple getters / setters. Temporary configoptions to access strings from
what is practically string->string map is exactly the kind of unnecessary
boilerplate that every dev and user wants to avoid.

Gyula

There are many cases where the features of the configoption are really not
needed.

On Thu, 14 Dec 2023 at 09:38, Xintong Song  wrote:


Hi Gyula,

First of all, even if we remove the `getXXX(String key, XXX defaultValue)`
methods, there are still several ways to access the configuration with
string-keys.

- If one wants to access a specific option, as Rui mentioned,
`ConfigOptions.key("xxx").stringType().noDefaultValue()` can be used.
TBH,
I can't think of a use case where a temporally created ConfigOption is
preferred over a predefined one. Do you have any examples for that?
- If one wants to access the whole configuration set, then `toMap` or
`iterator` might be helpful.

It is true that these ways are less convenient than `getXXX(String key, XXX
defaultValue)`, and that's exactly my purpose, to make the key-string less
convenient so that developers choose ConfigOption over it whenever is
possible.

there will always be cases where a more flexible

dynamic handling is necessary without the added overhead of the toMap

logic




I'm not sure about this. I agree there are cases where flexible and dynamic
handling is needed, but maybe "without the added overhead of the toMap
logic" is not that necessary?

I'd think of this as "encouraging developers to use ConfigOption as much as
possible" vs. "a bit less convenient in 5% of the cases". I guess there's
no right and wrong, just different engineer opinions. While I'm personally
stand with removing the string-key access methods, I'd also be fine with
the other way if there are more people in favor of it.

Best,

Xintong



On Thu, Dec 14, 2023 at 3:45 PM Gyula Fóra  wrote:


Hi Xintong,

I don’t really see the actual practical benefit from removing the

getstring

and setstring low level methods.

I understand that ConfigOptions are nicer for 95% of the cases but from a
technical point of view there will always be cases where a more flexible
dynamic handling is necessary without the added overhead of the toMap
logic.

I think it’s the most natural thing for any config abstraction to expose
basic get set methods with a simple key.

What do you think?

Cheers
Gyula

On Thu, 14 Dec 2023 at 08:00, Xintong Song 

wrote:




IIUC, you both prefer using ConfigOption instead of string keys for
all use cases, even internal ones. We can even forcefully delete
these @Depreated methods in Flink-2.0 to guide users or
developers to use ConfigOption.



Yes, at least from my side.


I noticed that Configuration is used in

DistributedCache#writeFileInfoToConfig and readFileInfoFromConfig
to store some cacheFile meta-information. Their keys are
temporary(key name with number) and it is not convenient
to predefine ConfigOption.



True, this one requires a bit more effort to migrate from string-key to
ConfigOption, but still should be doable. Looking at how the two

mentioned

methods are implemented and used, it seems what is really needed is
serialization and deserialization of `DistributedCacheEntry`-s. And all

the

entries are always written / read at once. So I think we can serialize

the

whole set of entries into a JSON string (or something similar), and use

one

ConfigOption with a deterministic key for it, rather than having one
ConfigOption for each field in each entry. WDYT?


If everyone agrees with this direction, we can start to refactor all

code that uses getXxx(String key, String defaultValue) into
getXxx(ConfigOption configOption), and completely
delete all getXxx(String key, Stri

Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-14 Thread Timo Walther

Hi Xuyang,

> I'm not spliting this flip is that all of these subtasks like session 
window tvf and cdc support do not change the public interface and the 
public syntax


Given the length of this mailing list discussion and number of involved 
people I would strongly suggest to simplify the FLIP and give it a 
better title to make quicker progress. In general, we all seem to be on 
the same page in what we want. And both session TVF support and the 
deprecation of the legacy group windows has been voted already and 
discussed thouroughly. The FLIP can purely focus on the CDC topic.


Cheers,
Timo


On 14.12.23 08:35, Xuyang wrote:

Hi, Timo, Sergey and Lincoln Lee. Thanks for your feedback.



In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We > should 
stay focused on what is needed for Flink 2.0.

The main goal and topic of this Flip is to align the abilities between the 
legacy group window agg syntax and the new window tvf syntax,
and then we can say that the legacy window syntax will be deprecated. IMO, 
although there are many misalignments about these two
syntaxes, such as session window tvf, cdc support and so on,  they are all the 
subtasks we need to do in this flip. Another reason I'm not
spliting this flip is that all of these subtasks like session window tvf and 
cdc support do not change the public interface and the public
syntax, the implements of them will only be in modules table-planner and 
table-runtime.



Can we postpone this discussion? Currently we should focus on user
switching to Window TVFs before Flink 2.0. Early fire, late fire and > allow 
lateness have not exposed through public configuration. It can be > introduced 
after Flink 2.0 released.



Agree with you. This flip will not and should not expose these experimental 
flink conf to users. I list them in this flip just aims to show the
misalignments about these two window syntaxes.


Look for your thought.




--

 Best!
 Xuyang





At 2023-12-13 15:40:16, "Lincoln Lee"  wrote:

Thanks Xuyang driving this work! It's great that everyone agrees with the
work itself in this flip[1]!

Regarding whether to split the flip or adjust the scope of this flip, I'd
like to share some thoughts:

1. About the title of this flip, what I want to say is that flip-145[2] had
marked the legacy group window deprecated in the documentation but the
functionality of the new syntax is not aligned with the legacy one.
This is not a user-friendly deprecation, so the initiation of this flip, as
I understand it, is for the formal deprecation of the legacy window, which
requires us to complete the functionality alignment.

2. Agree with Timo that we can process the late-fire/early-fire features
separately. These experimental parameters have not been officially opened
to users.
Considering the workload, we can focus more on this version.

3. I have no objection to splitting this flip if everyone feels that the
work included is too much.
Regarding the support of session tvf, it seems that the main problem is
that this part of the description occupies a large part of the flip,
causing some misunderstandings.
This is indeed a predetermined task in FLIP-145, just adding more
explanation about semantics. In addition, I saw the discussion history in
FLINK-24024[3], thanks Sergey for being willing to help driving this work
together.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-392%3A+Deprecate+the+Legacy+Group+Window+Aggregation
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
[3] https://issues.apache.org/jira/browse/FLINK-24024

Best,
Lincoln Lee


Sergey Nuyanzin  于2023年12月13日周三 08:02写道:


thanks for summarising Timo

+1 for splitting it in different FLIPs
and agree about having "SESSION Window TVF Aggregation" under FLIP-145
Moreover the task is already there, so no need to move it from one FLIP to
another


And actually Sergey Nuyanzin wanted to work
in this if I remember correctly. Not sure if this is still the case.


Yes, however it seems there is already an existing PR for that.
Anyway I'm happy to help here with the review as well and will reserve some
time for it in coming days.



On Tue, Dec 12, 2023 at 12:18 PM Timo Walther  wrote:


Hi Xuyang,

thanks for proposing this FLIP. In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We
should stay focused on what is needed for Flink 2.0.

Let me summarizing the topics:

1) SESSION Window TVF Aggregation

This has been agreed in FLIP-145 already. We don't need another FLIP but
someone that finally implements this after we have performed the Calcite
upgrade a couple of months ago. The Calcite upgrade was important
exactly for SESSION windows. And actually Sergey Nuyanzin wanted to work
in this if I remember correctly. Not sure if this is still t

Re: [DISCUSS] FLIP-387: Support named parameters for functions and call procedures

2023-12-14 Thread Timo Walther

Hi Feng,

thank you for proposing this FLIP. This nicely completes FLIP-65 which 
is great for usability.


I read the FLIP and have some feedback:


1) ArgumentNames annotation

> Deprecate the ArgumentNames annotation as it is not user-friendly for 
specifying argument names with optional configuration.


Which annotation does the FLIP reference here? I cannot find it in the 
Flink code base.


2) Evolution of FunctionHint

Introducing @ArgumentHint makes a lot of sense to me. However, using it 
within @FunctionHint looks complex, because there is both `input=` and 
`arguments=`. Ideally, the @DataTypeHint can be defined inline as part 
of the @ArgumentHint. It could even be the `value` such that 
`@ArgumentHint(@DataTypeHint("INT"))` is valid on its own.


We could deprecate `input=`. Or let both `input` and `arguments=` 
coexist but never be defined at the same time.


3) Semantical correctness

As you can see in the `TypeInference` class, named parameters are 
prepared in the stack already. However, we need to watch out between 
helpful explanation (see `InputTypeStrategy#getExpectedSignatures`) and 
named parameters (see `TypeInference.Builder#namedArguments`) that can 
be used in SQL.


If I remember correctly, named parameters can be reordered and don't 
allow overloading of signatures. Thus, only a single eval() should have 
named parameters. Looking at the FLIP it seems you would like to support 
multiple parameter lists. What changes are you planning to TypeInference 
(which is also declared as @PublicEvoving)? This should also be 
documented as the annotations should compile into this class.


In general, I would prefer to keep it simple and don't allow overloading 
named parameters. With the optionality, users can add an arbitrary 
number of parameters to the signature of the same eval method.


Regards,
Timo

On 14.12.23 10:02, Feng Jin wrote:

Hi all,


Xuyang and I would like to start a discussion of FLIP-387: Support
named parameters for functions and call procedures [1]

Currently, when users call a function or call a procedure, they must
specify all fields in order. When there are a large number of
parameters, it is easy to make mistakes and cannot omit specifying
non-mandatory fields.

By using named parameters, you can selectively specify the required
parameters, reducing the probability of errors and making it more
convenient to use.

Here is an example of using Named Procedure.
```
-- for scalar function
SELECT my_scalar_function(param1 => ‘value1’, param2 => ‘value2’’) FROM []

-- for table function
SELECT  *  FROM TABLE(my_table_function(param1 => 'value1', param2 => 'value2'))

-- for agg function
SELECT my_agg_function(param1 => 'value1', param2 => 'value2') FROM []

-- for call procedure
CALL  procedure_name(param1 => ‘value1’, param2 => ‘value2’)
```

For UDX and Call procedure developers, we introduce a new annotation
to specify the parameter name, indicate if it is optional, and
potentially support specifying default values in the future

```
public @interface ArgumentHint {
 /**
  * The name of the parameter, default is an empty string.
  */
 String name() default "";

 /**
  * Whether the parameter is optional, default is true.
  */
 boolean isOptional() default true;
}}
```

```
// Call Procedure Development

public static class NamedArgumentsProcedure implements Procedure {

// Example usage: CALL myNamedProcedure(in1 => 'value1', in2 => 'value2')

// Example usage: CALL myNamedProcedure(in1 => 'value1', in2 =>
'value2', in3 => 'value3')

@ProcedureHint(
input = {@DataTypeHint(value = "STRING"),
@DataTypeHint(value = "STRING"), @DataTypeHint(value = "STRING")},
output = @DataTypeHint("STRING"),
   arguments = {
 @ArgumentHint(name = "in1", isOptional = false),
 @ArgumentHint(name = "in2", isOptional = true)
 @ArgumentHint(name = "in3", isOptional = true)})
public String[] call(ProcedureContext procedureContext, String
arg1, String arg2, String arg3) {
return new String[]{arg1 + ", " + arg2 + "," + arg3};
}
}
```


Currently, we offer support for two scenarios when calling a function
or procedure:

1. The corresponding parameters can be specified using the parameter
name, without a specific order.
2. Unnecessary parameters can be omitted.


There are still some limitations when using Named parameters:
1. Named parameters do not support variable arguments.
2. UDX or procedure classes that support named parameters can only
have one eval method.
3. Due to the current limitations of Calcite-947[2], we cannot specify
a default value for omitted parameters, which is Null by default.



Also, thanks very much for the suggestions and help provided by Zelin
and Lincoln.




1. 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures.

2. https://issues.apache.org/jira/bro

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-15 Thread Timo Walther

1. Override the function `getRequirements` in `AsyncScalarFunction`

> If the user overrides `requirements()` to omit the `ORDERED`
> requirement, do we allow the operator to return out-of-order results
> or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
> behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides 
whether ORDERED or UNORDERED is safe to do. For example, if the query is 
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the 
operator is not allowed to produce unordered results. By global 
configuration, we can set ORDERED such that users don't get confused 
about the unordered output. The mode UNORDERED however should only have 
effect for these simply use cases and throw an exception if UNORDERED 
would mess up a changelog or other subsequent operators.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


> What about throwing an exception to make it clear to users that using 
async scalar functions


@Xuyang: A regular SQL user doesn't care whether the function is sync or 
async. The planner should simply give its best to make the execution 
performant. I would not throw an exception here. There more exceptions 
the, the more struggles and questions from the user. Conceptually, we 
can run async code also sync, and that's why we should also do it to 
avoid errors.


3. Hints

@Aitozi: Let's go with global configuration first and later introduce 
hints. I feel the more hints we introduce, the harder SQL queries get 
when maintaining them.


Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:

Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the 
improvement
effect is obvious, so I think it makes sense to support async scalar function.  
Big +1 for this flip.
I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query hint(individual async 
udx) to mark the output
mode 'order' or 'unorder' like async look join [1] and async udtf[2], but chose 
to introduce a new enum
in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be 
executed in sync mode.


What about throwing an exception to make it clear to users that using async 
scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users may be 
confused about
the final actual job graph.


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-234%3A+Support+Retryable+Lookup+Join+To+Solve+Delayed+Updates+Issue+In+External+Systems
[2] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction









--

 Best!
 Xuyang





在 2023-12-15 11:20:24,"Aitozi"  写道:

Hi Alan,
Nice FLIP, I also explore leveraging the async table function[1] to
improve the throughput before.

About the configs, what do you think using hints as mentioned in [1].

[1]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-313%3A+Add+support+of+User+Defined+AsyncTableFunction

Thanks,
Aitozi.

Timo Walther  于2023年12月14日周四 17:29写道:


Hi Alan,

thanks for proposing this FLIP. It's a great addition to Flink and has
been requested multiple times. It will be in particular interesting for
accessing REST endpoints and other remote services.

Great that we can generalize and reuse parts of the Python planner rules
and code for this.

I have some feedback regarding the API:

1) Configuration

Configuration keys like

`table.exec.async-scalar.catalog.db.func-name.buffer-capacity`

are currently not supported in the configuration stack. The key space
should remain constant. Only a constant key space enables the use of the
ConfigOption class which is required in the layered configuration. For
now I would suggest to only allow a global setting for buffer capacity,
timeout, and retry-strategy. We can later work on a per-function
configuration (potentially also needed for other use cases).

2) Semantical declaration

Regarding

`table.exec.async-scalar.catalog.db.func-name.output-mode`

this is a semantical property of a function and should be defined
per-function. It impacts the query result and potentially the behavior
of planner rules.

I see two options for this either: (a) an additional method in
AsyncScalarFunction or (b) adding this to the function's requirements. I
vote for (b), because a FunctionDefinition should be fully self
contained and sufficient for planning.

Thus, for `FunctionDefinition.getRequirements():
Set` we can add a new requirement `ORDERED` which
should also be the default for AsyncScalarFunction. `getRequirements()`
can be overwritten and return a set without this requirement if the user
intents to do this.


Thanks

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-18 Thread Timo Walther
 as a
FunctionRequirement vs a simple configuration/hint.

What about throwing an exception to make it clear to users that using async

scalar functions in this situation
is problematic instead of executing silently in sync mode? Because users
may be confused about
the final actual job graph.



@Xuyang: Would it make a difference if it were exposed by the explain
method (the operator having "syncMode" vs not)?  I'd be fine to do it
either way -- certainly throwing an error is a bit simpler.

You are right. Actually it should be the planner that fully decides

whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output.



@Timo: Is there an easy way to determine if the output of an async function
would be problematic or not?  If the input to the operator is append-only,
it seems fine, because this implies that each row is effectively
independent and ordering is unimportant. For upsert mode with +U rows, you
wouldn't want to swap order with other +U rows for the same key because the
last one should win.  For -D or -U rows, you wouldn't want to swap with
other rows for the same key for similar reasons.  Is it as simple as
looking for the changlelog mode to determine whether it's safe to run async
functions UNORDERED?  I had considered analyzing various query forms (join
vs aggregation vs whatever), but it seems like changelog mode could be
sufficient to understand what works and what would be an issue.  Any code
pointers and explanation for similar analysis would be great to understand
this more.

The mode UNORDERED however should only have

effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.


@Timo: Should we throw errors or run in sync mode?  It seems like running
in sync mode is an option to ensure correctness in all changelog modes.

Let's go with global configuration first and later introduce

hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.


@Timo: That seems like a reasonable approach to me.

-Alan

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/metric_reporters/

On Fri, Dec 15, 2023 at 2:56 AM Timo Walther  wrote:


1. Override the function `getRequirements` in `AsyncScalarFunction`

  > If the user overrides `requirements()` to omit the `ORDERED`
  > requirement, do we allow the operator to return out-of-order results
  > or should it fall back on `AsyncOutputMode.ALLOW_UNORDERED` type
  > behavior (where we allow out-of-order only if it's deemed correct)?

You are right. Actually it should be the planner that fully decides
whether ORDERED or UNORDERED is safe to do. For example, if the query is
an append-only `SELECT FUNC(c) FROM t`, I don't see a reason why the
operator is not allowed to produce unordered results. By global
configuration, we can set ORDERED such that users don't get confused
about the unordered output. The mode UNORDERED however should only have
effect for these simply use cases and throw an exception if UNORDERED
would mess up a changelog or other subsequent operators.

2. In some scenarios with semantic correctness, async operators must be
executed in sync mode.

  > What about throwing an exception to make it clear to users that using
async scalar functions

@Xuyang: A regular SQL user doesn't care whether the function is sync or
async. The planner should simply give its best to make the execution
performant. I would not throw an exception here. There more exceptions
the, the more struggles and questions from the user. Conceptually, we
can run async code also sync, and that's why we should also do it to
avoid errors.

3. Hints

@Aitozi: Let's go with global configuration first and later introduce
hints. I feel the more hints we introduce, the harder SQL queries get
when maintaining them.

Regards,
Timo




On 15.12.23 04:51, Xuyang wrote:

Hi, Alan. Thanks for driving this.


Using async to improve throughput has been done on look join, and the

improvement

effect is obvious, so I think it makes sense to support async scalar

function.  Big +1 for this flip.

I have some questions below.


1. Override the function `getRequirements` in `AsyncScalarFunction`


I’m just curious why you don’t use conf(global) and query

hint(individual async udx) to mark the output

mode 'order' or 'unorder' like async look join [1] and async udtf[2],

but chose to introduce a new enum

in AsyncScalarFunction.


2. In some scenarios with semantic correctness, async operators must be

executed in sync mode.



What about throwing an exception to make it clear to users that using

async scalar functions i

Re: [DISCUSS] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-19 Thread Timo Walther

> I would be totally fine with the first version only having ORDERED
> mode. For a v2, we could attempt to do the next most conservative
> thing

Sounds good to me.

I also cheked AsyncWaitOperator and could not find n access of 
StreamRecord's timestamp but only watermarks. But as we said, let's 
focus on ORDERED first.


Can you remove the necessary parts? Esp.:

@Override
public Set getRequirements() {
return Collections.singleton(FunctionRequirement.ORDERED);
}

Otherwise I have no objections to start a VOTE soonish. If others are 
fine as well?


Regards,
Timo


On 19.12.23 07:32, Alan Sheinberg wrote:

Thanks for the helpful comments, Xuyang and Timo.

@Timo, @Alan: IIUC, there seems to be something wrong here. Take kafka as

source and mysql as sink as an example.
Although kafka is an append-only source, one of its fields is used as pk
when writing to mysql. If async udx is executed
  in an unordered mode, there may be problems with the data in mysql in the
end. In this case, we need to ensure that
the sink-based pk is in order actually.



@Xuyang: That's a great point.  If some node downstream of my operator
cares about ordering, there's no way for it to reconstruct the original
ordering of the rows as they were input to my operator.  So even if they
want to preserve ordering by key, the order in which they see it may
already be incorrect.  Somehow I thought that maybe the analysis of the
changelog mode at a given operator was aware of downstream operations, but
it seems not.

Clear "no" on this. Changelog semantics make the planner complex and we

need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.



@Timo: The better I understand the complexity, the more I agree with this.
I would be totally fine with the first version only having ORDERED mode.
For a v2, we could attempt to do the next most conservative thing and only
allow UNORDERED when the whole graph is in *INSERT *changelog mode.  The
next best type of optimization might understand what's the key required
downstream, and allow breaking the original order only between unrelated
keys, but maintaining it between rows of the same key.  Of course if the
key used downstream is computed in some manner, that makes it all the
harder to know this beforehand.

So unordering should be fine *within* watermarks. This is also what

watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if I
remember correctly. However, it assumes a timestamp is present in
StreamRecord on which it can work. But this is not the case within the
SQL engine.



*AsyncWaitOperator* and *UnorderedStreamElementQueue* (the implementations
I plan on using) seem to support exactly this behavior.  I don't think it
makes assumptions about the record's timestamp, but just preserves whatever
the input order is w.r.t watermarks.  I'd be curious to understand the
timestamp use in more detail and see if it's required with the mentioned
classes.

TLDR: Let's focus on ORDERED first.


I'm more than happy to start here and we can consider UNORDERED as a
followup.  Then maybe we consider only INSERT mode graphs and ones where we
can solve the watermark constraints.

Thanks,
Alan


On Mon, Dec 18, 2023 at 2:36 AM Timo Walther  wrote:


Hi Xuyang and Alan,

thanks for this productive discussion.

  > Would it make a difference if it were exposed by the explain

@Alan: I think this is great idea. +1 on exposing the sync/async
behavior thought EXPLAIN.


  > Is there an easy way to determine if the output of an async function
  > would be problematic or not?

Clear "no" on this. Changelog semantics make the planner complex and we
need to be careful. Therefore I would strongly suggest we introduce
ORDERED and slowly enable UNORDERED whenever we see a good fit for it in
plans with appropriate planner rules that guard it.

  > If the input to the operator is append-only, it seems fine, because
  > this implies that each row is effectively independent and ordering is
  > unimportant.

As @Xuyang pointed out, it's not only the input that decides whether
append-only is safe. It's also the subsequent operators in the pipeline.
The example of Xuyang is a good one, when the sink operates in upsert
mode. Append-only source, append-only operators, and append-only sink
are safer.

However, even in this combination, a row is not fully "independent"
there are still watermarks flowing between rows:

R(5), W(4), R(3), R(4), R(2), R(1), W(0)

So unordering should be fine *within* watermarks. This is also what
watermarks are good for, a trade-off between strict ordering and making
progress. The async operator from DataStream API also supports this if 

Re: [DISCUSS] FLIP-392: Deprecate the Legacy Group Window Aggregation

2023-12-19 Thread Timo Walther

Hi Xuyang,

sorry I missed the ping. Sounds reasonable to me. One FLIP about 
changelog semantics, the other about SQL semantics.


Regards,
Timo

On 19.12.23 02:39, Xuyang wrote:

Hi, Timo. Sorry for this noise.
What do you think about splitting the flip like this?




--

 Best!
 Xuyang





At 2023-12-15 10:05:32, "Xuyang"  wrote:

Hi, Timo, thanks for your advice.


I am considering splitting the existing flip into two while leaving the 
existing flip (or without).
One of them points to the completion of the operator about window tvf to 
support CDC (there are several
small work items, such as window agg, window rank, window join, etc. Due to 
time constraints,
the 1.19 version takes priority to complete the window agg). The other points 
to the HOP window tvf
supports a size that is a non-integer multiple of the step. Once these two 
flips are basically completed
in 1.19, we can consider officially deprecating the old group window agg syntax 
in the release note.


WDYT?




--

Best!
Xuyang





At 2023-12-14 17:51:01, "Timo Walther"  wrote:

Hi Xuyang,


I'm not spliting this flip is that all of these subtasks like session

window tvf and cdc support do not change the public interface and the
public syntax

Given the length of this mailing list discussion and number of involved
people I would strongly suggest to simplify the FLIP and give it a
better title to make quicker progress. In general, we all seem to be on
the same page in what we want. And both session TVF support and the
deprecation of the legacy group windows has been voted already and
discussed thouroughly. The FLIP can purely focus on the CDC topic.

Cheers,
Timo


On 14.12.23 08:35, Xuyang wrote:

Hi, Timo, Sergey and Lincoln Lee. Thanks for your feedback.



In my opinion the FLIP touches too many
topics at the same time and should be split into multiple FLIPs. We > should 
stay focused on what is needed for Flink 2.0.

The main goal and topic of this Flip is to align the abilities between the 
legacy group window agg syntax and the new window tvf syntax,
and then we can say that the legacy window syntax will be deprecated. IMO, 
although there are many misalignments about these two
syntaxes, such as session window tvf, cdc support and so on,  they are all the 
subtasks we need to do in this flip. Another reason I'm not
spliting this flip is that all of these subtasks like session window tvf and 
cdc support do not change the public interface and the public
syntax, the implements of them will only be in modules table-planner and 
table-runtime.



Can we postpone this discussion? Currently we should focus on user
switching to Window TVFs before Flink 2.0. Early fire, late fire and > allow 
lateness have not exposed through public configuration. It can be > introduced 
after Flink 2.0 released.



Agree with you. This flip will not and should not expose these experimental 
flink conf to users. I list them in this flip just aims to show the
misalignments about these two window syntaxes.


Look for your thought.




--

  Best!
  Xuyang





At 2023-12-13 15:40:16, "Lincoln Lee"  wrote:

Thanks Xuyang driving this work! It's great that everyone agrees with the
work itself in this flip[1]!

Regarding whether to split the flip or adjust the scope of this flip, I'd
like to share some thoughts:

1. About the title of this flip, what I want to say is that flip-145[2] had
marked the legacy group window deprecated in the documentation but the
functionality of the new syntax is not aligned with the legacy one.
This is not a user-friendly deprecation, so the initiation of this flip, as
I understand it, is for the formal deprecation of the legacy window, which
requires us to complete the functionality alignment.

2. Agree with Timo that we can process the late-fire/early-fire features
separately. These experimental parameters have not been officially opened
to users.
Considering the workload, we can focus more on this version.

3. I have no objection to splitting this flip if everyone feels that the
work included is too much.
Regarding the support of session tvf, it seems that the main problem is
that this part of the description occupies a large part of the flip,
causing some misunderstandings.
This is indeed a predetermined task in FLIP-145, just adding more
explanation about semantics. In addition, I saw the discussion history in
FLINK-24024[3], thanks Sergey for being willing to help driving this work
together.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-392%3A+Deprecate+the+Legacy+Group+Window+Aggregation
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-145%3A+Support+SQL+windowing+table-valued+function
[3] https://issues.apache.org/jira/browse/FLINK-24024

Best,
Lincoln Lee


Sergey Nuyanzin  于2023年12月13日周三 08:02写道:


thanks for summarising Timo

+1 for splitting it in different FLIPs
and agree about having "SESSION Window TV

Re: [VOTE] FLIP-400: AsyncScalarFunction for asynchronous scalar function support

2023-12-28 Thread Timo Walther
+1 (binding)

Cheers,
Timo

> Am 28.12.2023 um 03:13 schrieb Yuepeng Pan :
> 
> +1 (non-binding).
> 
> Best,
> Yuepeng Pan.
> 
> 
> 
> 
> At 2023-12-28 09:19:37, "Lincoln Lee"  wrote:
>> +1 (binding)
>> 
>> Best,
>> Lincoln Lee
>> 
>> 
>> Martijn Visser  于2023年12月27日周三 23:16写道:
>> 
>>> +1 (binding)
>>> 
>>> On Fri, Dec 22, 2023 at 1:44 AM Jim Hughes 
>>> wrote:
 
 Hi Alan,
 
 +1 (non binding)
 
 Cheers,
 
 Jim
 
 On Wed, Dec 20, 2023 at 2:41 PM Alan Sheinberg
  wrote:
 
> Hi everyone,
> 
> I'd like to start a vote on FLIP-400 [1]. It covers introducing a new
>>> UDF
> type, AsyncScalarFunction for completing invocations asynchronously.
>>> It
> has been discussed in this thread [2].
> 
> I would like to start a vote.  The vote will be open for at least 72
>>> hours
> (until December 28th 18:00 GMT) unless there is an objection or
> insufficient votes.
> 
> [1]
> 
> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-400%3A+AsyncScalarFunction+for+asynchronous+scalar+function+support
> [2] https://lists.apache.org/thread/q3st6t1w05grd7bthzfjtr4r54fv4tm2
> 
> Thanks,
> Alan
> 
>>> 



Re: [DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL and TableAPI

2024-01-02 Thread Timo Walther

Hi Jane,

thanks for the heavy investigation and extensive summaries. I'm sorry 
that I ignored this discussion for too long but would like to help in 
shaping a sustainable long-term solution.


I fear that changing:
- RowType#copy()
- RowType's constructor
- FieldsDataType#nullable()
will not solve all transitive issues.

We should approach the problem from a different perspective. In my point 
of view:

- DataType and LogicalType are just type declarations.
- RelDataType is similarly just a type declaration. Please correct me if 
I'm wrong but RelDataType itself also allows `ROW NOT 
NULL`. It's the factory or optimizer that performs necessary changes.
- It's up to the framework (i.e. planner or Table API) to decide what to 
do with these declarations.


Let's take a Java class:

class MyPojo {
  int i;
}

MyPojo can be nullable, but i cannot. This is the reason why we decided 
to introduce the current behavior. Complex structs are usually generated 
from Table API or from the catalog (e.g. when mapping to schema registry 
or some other external system). It could lead to other downstream 
inconsistencies if we change the method above.


I can't provide a better solution right now, I need more research on 
this topic. But we should definitely avoid another breaking change 
similar to [1] where the data type system was touched and other projects 
were affected.


How about we work together on this topic and create a FLIP for this? We 
need more examples in a unified document. Currently, the proposal is 
split across multiple Flink and Calcite JIRA issues and a ML discussion.


Regards,
Timo


[1] https://issues.apache.org/jira/browse/FLINK-33523


On 26.12.23 04:47, Jane Chan wrote:

Thanks Shengkai and Xuyang.

@Shengkai

I have one question: Is the influence only limited to the RowType? Does the

Map or Array type have the same problems?



I think the issue is exclusive to RowType. You may want to review
CALCITE-2464[1] for more details.

[1] https://issues.apache.org/jira/browse/CALCITE-2464

@Xuyang

Is it possible to consider introducing a deprecated option to allow users

to fall back to the previous version (default fallback), and then
officially deprecate it in Flink 2.0?



If I understand correctly, 2.0 allows breaking changes to remove historical
baggage in this release. Therefore, if we want to fix this issue before
2.0, we could introduce a fallback option in the two most recent versions
(1.19 and 1.20). However, from version 2.0 onwards, since we no longer
promise backward compatibility, introducing a fallback option might be
unnecessary. What do you think?

BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle

the nullable attribute of the Row type in the way Calcite expected.
However, fixing them will also cause a relatively large impact. We may also
need to check the code part in SQL.



Yes, this is another issue caused by the row type nullability handling.
I've mentioned this JIRA ticket in the reference link to the previous
reply.

Best,
Jane

On Mon, Dec 25, 2023 at 1:42 PM Xuyang  wrote:


Hi, Jane, thanks for driving this.


IMO, it is important to keep same consistent semantics between table api
and sql, not only for maintenance, but also for user experience. But for
users, the impact of this modification is a bit large. Is it possible to
consider introducing a deprecated option to allow users to fall back to the
previous version (default fallback), and then officially deprecate it in
Flink 2.0?


BTW, this jira FLINK-33217[1] is caused by that Flink SQL does not handle
the nullable attribute of the Row type in the way Calcite expected.
However, fixing them will also cause a relatively large impact. We may also
need to check the code part in SQL.


[1] https://issues.apache.org/jira/browse/FLINK-33217




--

 Best!
 Xuyang





在 2023-12-25 10:16:28,"Shengkai Fang"  写道:

Thanks for Jane and Sergey's proposal!

+1 to correct the Table API behavior.

I have one question: Is the influence only limited to the RowType? Does

the

Map or Array type have the same problems?

Best,
Shengkai
[DISCUSS][FLINK-31830] Align the Nullability Handling of ROW between SQL
and TableA

Jane Chan  于2023年12月22日周五 17:40写道:


Dear devs,

Several issues [1][2][3] have been identified regarding the inconsistent
treatment of ROW type nullability between SQL and TableAPI. However,
addressing these discrepancies might necessitate updates to the public

API.

Therefore, I'm initiating this discussion to engage the community in
forging a unified approach to resolve these challenges.

To summarize, SQL prohibits ROW types such as ROW, which is implicitly rewritten to ROW by
Calcite[4]. In contrast, TableAPI permits such types, resulting in
inconsistency.
[image: image.png]
For a comprehensive issue breakdown, please refer to the comment of [1].

According to CALCITE-2464[4], ROW is not a valid type.

As

a result, the behavior of TableAPI is incorrect and needs to be

consistent

wi

Re: [VOTE] FLIP-387: Support named parameters for functions and call procedures

2024-01-04 Thread Timo Walther
Thanks, for starting the VOTE thread and thanks for considering my 
feedback. One last comment before I'm also happy to give my +1 to this:


Why is ArgumentHint's default isOptinal=true? Shouldn't it be false by 
default? Many function implementers will forget to set this to false and 
suddenly get NULLs passed to their functions. Marking an argument as 
optional should be an explicit decision of an implementer.


Regards,
Timo


On 05.01.24 05:06, Lincoln Lee wrote:

+1 (binding)

Best,
Lincoln Lee


Benchao Li  于2024年1月5日周五 11:46写道:


+1 (binding)

Feng Jin  于2024年1月5日周五 10:49写道:


Hi everyone

Thanks for all the feedback about the FLIP-387: Support named parameters
for functions and call procedures [1] [2] .

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 10, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1]


https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures

[2] https://lists.apache.org/thread/bto7mpjvcx7d7k86owb00dwrm65jx8cn


Best,
Feng Jin




--

Best,
Benchao Li







Re: [VOTE] FLIP-387: Support named parameters for functions and call procedures

2024-01-05 Thread Timo Walther

Thanks for the last minute change.

+1 (binding)

Cheers,
Timo


On 05.01.24 08:59, Feng Jin wrote:

Hi Timo,

Thank you for the suggestion. Previously, I thought most parameters were
optional, so the default value was set to true.

Your concern is reasonable. We should declare it as false by default and
developers should explicitly state if a parameter is optional instead of
using our default value.

Regarding this part, I have already made modifications in the document.


Best,
Feng


On Fri, Jan 5, 2024 at 3:38 PM Timo Walther  wrote:


Thanks, for starting the VOTE thread and thanks for considering my
feedback. One last comment before I'm also happy to give my +1 to this:

Why is ArgumentHint's default isOptinal=true? Shouldn't it be false by
default? Many function implementers will forget to set this to false and
suddenly get NULLs passed to their functions. Marking an argument as
optional should be an explicit decision of an implementer.

Regards,
Timo


On 05.01.24 05:06, Lincoln Lee wrote:

+1 (binding)

Best,
Lincoln Lee


Benchao Li  于2024年1月5日周五 11:46写道:


+1 (binding)

Feng Jin  于2024年1月5日周五 10:49写道:


Hi everyone

Thanks for all the feedback about the FLIP-387: Support named

parameters

for functions and call procedures [1] [2] .

I'd like to start a vote for it. The vote will be open for at least 72
hours(excluding weekends,until Jan 10, 12:00AM GMT) unless there is an
objection or an insufficient number of votes.



[1]




https://cwiki.apache.org/confluence/display/FLINK/FLIP-387%3A+Support+named+parameters+for+functions+and+call+procedures

[2] https://lists.apache.org/thread/bto7mpjvcx7d7k86owb00dwrm65jx8cn


Best,
Feng Jin




--

Best,
Benchao Li












Re: [VOTE] Improve TableFactory to add Context

2020-02-06 Thread Timo Walther

+1

On 06.02.20 05:54, Bowen Li wrote:

+1, LGTM

On Tue, Feb 4, 2020 at 11:28 PM Jark Wu  wrote:


+1 form my side.
Thanks for driving this.

Btw, could you also attach a JIRA issue with the changes described in it,
so that users can find the issue through the mailing list in the future.

Best,
Jark

On Wed, 5 Feb 2020 at 13:38, Kurt Young  wrote:


+1 from my side.

Best,
Kurt


On Wed, Feb 5, 2020 at 10:59 AM Jingsong Li 
wrote:


Hi all,

Interface updated.
Please re-vote.

Best,
Jingsong Lee

On Tue, Feb 4, 2020 at 1:28 PM Jingsong Li 

wrote:



Hi all,

I would like to start the vote for the improve of
TableFactory, which is discussed and
reached a consensus in the discussion thread[2].

The vote will be open for at least 72 hours. I'll try to close it
unless there is an objection or not enough votes.

[1]






http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Improve-TableFactory-td36647.html


Best,
Jingsong Lee




--
Best, Jingsong Lee











Re: [DISCUSS] Does removing deprecated interfaces needs another FLIP

2020-02-07 Thread Timo Walther

Hi Kurt,

I agree with Aljoscha. We don't need to introduce a big process or do 
voting but we should ensure that all stakeholders are notified and have 
a chance to raise doubts.


Regards,
Timo


On 07.02.20 09:51, Aljoscha Krettek wrote:

I would say a ML discussion or even a Jira issue is enough because

a) the methods are already deprecated
b) the methods are @PublicEvolving, which I don't consider a super 
strong guarantee to users (we still shouldn't remove them lightly, but 
we can if we have to...)


Best,
Aljoscha

On 07.02.20 04:40, Kurt Young wrote:

Hi dev,

Currently I want to remove some already deprecated methods from
TableEnvironment which annotated with @PublicEnvolving. And I also 
created

a discussion thread [1] to both dev and user mailing lists to gather
feedback on that. But I didn't find any matching rule in Flink bylaw 
[2] to

follow. Since this is definitely a API breaking change, but we already
voted for that back in the FLIP which deprecated these methods.

I'm not sure about how to proceed for now. Looks like I have 2 choices:

1. If no one raise any objections in discuss thread in like 72 hours, I
will create a jira to start working on it.
2. Since this is a API breaking change, I need to open another FLIP to 
tell

that I want to remove these deprecated methods. This seems a little
redundant with the first FLIP which deprecate the methods.

What do you think?

Best,
Kurt

[1]
https://lists.apache.org/thread.html/r98af66feb531ce9e6b94914e44391609cad857e16ea84db5357c1980%40%3Cdev.flink.apache.org%3E 


[2] https://cwiki.apache.org/confluence/display/FLINK/Flink+Bylaws





Re: [DISCUSS] Update UpsertStreamTableSink and RetractStreamTableSink to new type system

2020-02-07 Thread Timo Walther

Hi Zhenghua,

Jark is right. The reason why we haven't updated those interfaces yet is 
because we are actually would like to introduce new interfaces. We 
should target new interfaces in this release. Even a short-term fix as 
you proposed with `getRecordDataType` does actually not help as Jingsong 
pointed out because we cannot represent tuples in DataType and are also 
not planning to support them natively but only as a structured type in 
the future.


In my envisioned design, the new sink interface should just always get a 
`ChangeRow` which is never serialized and just a data structure for 
communicating between the wrapping sink function and the returned sink 
function by the table sink.


Let me sketch a rough design document that I will share with you 
shortly. Then we could also discuss alternatives.


Thanks,
Timo


On 04.02.20 04:18, Zhenghua Gao wrote:

Hi Jark, thanks for your comments.

IIUC, the framework will only recognize getRecordDataType and
ignore getConsumedDataType for UpsertStreamTableSink, is that right?

Your are right.


getRecordDataType is little confused as UpsertStreamTableSink already has
three getXXXType().

the getRecordType and getOutputType is deprecated and mainly for backward
compatibility.

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 10:11 PM Jark Wu  wrote:


Thanks Zhenghua for starting this discussion.

Currently, all the UpsertStreamTableSinks can't upgrade to the new type
system which affects usability a lot.
I hope we can fix that in 1.11.

I'm find with *getRecordDataType* for a temporary solution.
IIUC, the framework will only recognize getRecordDataType and
ignore getConsumedDataType for UpsertStreamTableSink, is that right?

I guess Timo are planning to design a new source/sink interface which will
also fix this problem, but I'm not sure the timelines. cc @Timo
It would be better if we can have a new and complete interface, because
getRecordDataType is little confused as UpsertStreamTableSink already has
three getXXXType().

Best,
Jark


On Mon, 3 Feb 2020 at 17:48, Zhenghua Gao  wrote:


Hi Jingsong,  For now, only UpsertStreamTableSink and
RetractStreamTableSink consumes JTuple2
So the 'getConsumedDataType' interface is not necessary in validate &
codegen phase.
See



https://github.com/apache/flink/pull/10880/files#diff-137d090bd719f3a99ae8ba6603ed81ebR52

  and



https://github.com/apache/flink/pull/10880/files#diff-8405c17e5155aa63d16e497c4c96a842R304


What about stay the same to use RAW type?

*Best Regards,*
*Zhenghua Gao*


On Mon, Feb 3, 2020 at 4:59 PM Jingsong Li 

wrote:



Hi Zhenghua,

The *getRecordDataType* looks good to me.

But the main problem is how to represent the tuple type in DataType. I
understand that it is necessary to use StructuredType, but at present,
planner does not support StructuredType, so the other way is to support
StructuredType.

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 4:49 PM Kurt Young  wrote:


Would overriding `getConsumedDataType` do the job?

Best,
Kurt


On Mon, Feb 3, 2020 at 3:52 PM Zhenghua Gao 

wrote:



Hi all,

FLINK-12254[1] [2] updated TableSink and related interfaces to new

type

system which
allows connectors use the new type system based on DataTypes.

But FLINK-12911 port UpsertStreamTableSink and

RetractStreamTableSink

to

flink-api-java-bridge and returns TypeInformation of the requested

record

type which
can't support types with precision and scale, e.g. TIMESTAMP(p),
DECIMAL(p,s).

/**
  * Returns the requested record type.
  */
TypeInformation getRecordType();


A proposal is deprecating the *getRecordType* API and adding a
*getRecordDataType* API instead to return the data type of the

requested

record. I have filed the issue FLINK-15469 and
an initial PR to verify it.

What do you think about this API changes? Any feedback are

appreciated.

[1] https://issues.apache.org/jira/browse/FLINK-12254
[2] https://github.com/apache/flink/pull/8596
[3] https://issues.apache.org/jira/browse/FLINK-15469

*Best Regards,*
*Zhenghua Gao*





--
Best, Jingsong Lee











Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-07 Thread Timo Walther

Hi Kurt,

Dawid is currently working on making a tableEnv.fromElements/values() 
kind of source possible in the future. We can use this to replace some 
of the tests. Otherwise I guess we should come up with a better test 
infrastructure to make defining source not necessary anymore.


Regards,
Timo


On 07.02.20 11:24, Kurt Young wrote:

Thanks all for your feedback, since no objection has been raised, I've
created
https://issues.apache.org/jira/browse/FLINK-15950 to track this issue.

Since this issue would require lots of tests adjustment before it really
happen,
it won't be done in a short time. Feel free to give feedback anytime here
or in jira
if you have other opinions.

Best,
Kurt


On Wed, Feb 5, 2020 at 8:26 PM Kurt Young  wrote:


Hi Zhenghua,

After removing TableSource::getTableSchema, during optimization, I could
imagine
the schema information might come from relational nodes such as TableScan.

Best,
Kurt


On Wed, Feb 5, 2020 at 8:24 PM Kurt Young  wrote:


Hi Jingsong,

Yes current TableFactory is not ideal for users to use either. I think we
should
also spend some time in 1.11 to improve the usability of TableEnvironment
when
users trying to read or write something. Automatic scheme inference would
be
one of them. Other from this, we also support convert a DataStream to
Table, which
can serve some flexible requirements to read or write data.

Best,
Kurt


On Wed, Feb 5, 2020 at 7:29 PM Zhenghua Gao  wrote:


+1 to remove these methods.

One concern about invocations of TableSource::getTableSchema:
By removing such methods, we can stop calling TableSource::getTableSchema
in some place(such
as BatchTableEnvImpl/TableEnvironmentImpl#validateTableSource,
ConnectorCatalogTable, TableSourceQueryOperation).

But in other place we need field types and names of the table source(such
as
BatchExecLookupJoinRule/StreamExecLookupJoinRule,
PushProjectIntoTableSourceScanRule,
CommonLookupJoin).  So how should we deal with this?

*Best Regards,*
*Zhenghua Gao*


On Wed, Feb 5, 2020 at 2:36 PM Kurt Young  wrote:


Hi all,

I'd like to bring up a discussion about removing registration of
TableSource and
TableSink in TableEnvironment as well as in ConnectTableDescriptor. The
affected
method would be:

TableEnvironment::registerTableSource
TableEnvironment::fromTableSource
TableEnvironment::registerTableSink
ConnectTableDescriptor::registerTableSource
ConnectTableDescriptor::registerTableSink
ConnectTableDescriptor::registerTableSourceAndSink

(Most of them are already deprecated, except for
TableEnvironment::fromTableSource,
which was intended to deprecate but missed by accident).

FLIP-64 [1] already explained why we want to deprecate TableSource &
TableSink from
user's interface. In a short word, these interfaces should only read &
write the physical
representation of the table, and they are not fitting well after we

already

introduced some
logical table fields such as computed column, watermarks.

Another reason is the exposure of registerTableSource in Table Env just
make the whole
SQL protocol opposite. TableSource should be used as a reader of

table, it

should rely on
other metadata information held by framework, which eventually comes

from

DDL or
ConnectDescriptor. But if we register a TableSource to Table Env, we

have

no choice but
have to rely on TableSource::getTableSchema. It will make the design
obscure, sometimes
TableSource should trust the information comes from framework, but
sometimes it should
also generate its own schema information.

Furthermore, if the authority about schema information is not clear, it
will make things much
more complicated if we want to improve the table api usability such as
introducing automatic
schema inference in the near future.

Since this is an API break change, I've also included user mailing

list to

gather more feedbacks.

Best,
Kurt

[1]



https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module












Re: [VOTE] FLIP-55: Introduction of a Table API Java Expression DSL

2020-02-10 Thread Timo Walther

+1 for this.

It will also help in making a TableEnvironment.fromElements() possible 
and reduces technical debt. One entry point of TypeInformation less in 
the API.


Regards,
Timo


On 10.02.20 08:31, Dawid Wysakowicz wrote:

Hi all,

I wanted to resurrect the thread about introducing a Java Expression
DSL. Please see the updated flip page[1]. Most of the flip was concluded
in previous discussion thread. The major changes since then are:

* accepting java.lang.Object in the Java DSL

* adding $ interpolation for a column in the Scala DSL

I think it's important to move those changes forward as it makes it
easier to transition to the new type system (Java parser supports only
the old type system stack for now) that we are working on for the past
releases.

Because the previous discussion thread was rather conclusive I want to
start already with a vote. If you think we need another round of
discussion, feel free to say so.


The vote will last for at least 72 hours, following the consensus voting
process.

FLIP wiki:

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-55%3A+Introduction+of+a+Table+API+Java+Expression+DSL


Discussion thread:

https://lists.apache.org/thread.html/eb5e7b0579e5f1da1e9bf1ab4e4b86dba737946f0261d94d8c30521e@%3Cdev.flink.apache.org%3E








Re: [ANNOUNCE] Apache Flink 1.10.0 released

2020-02-12 Thread Timo Walther

Congratualations everyone! Great stuff :-)

Regards,
Timo


On 12.02.20 16:05, Leonard Xu wrote:

Great news!
Thanks everyone involved !
Thanks Gary and Yu for being the release manager !

Best,
Leonard Xu


在 2020年2月12日,23:02,Stephan Ewen  写道:

Congrats to us all.

A big piece of work, nicely done.

Let's hope that this helps our users make their existing use cases easier and 
also opens up new use cases.

On Wed, Feb 12, 2020 at 3:31 PM 张光辉 mailto:beggingh...@gmail.com>> wrote:
Greet work.

Congxian Qiu mailto:qcx978132...@gmail.com>> 
于2020年2月12日周三 下午10:11写道:
Great work.
Thanks everyone involved.
Thanks Gary and Yu for being the release manager


Best,
Congxian


Jark Wu mailto:imj...@gmail.com>> 于2020年2月12日周三 下午9:46写道:
Congratulations to everyone involved!
Great thanks to Yu & Gary for being the release manager!

Best,
Jark

On Wed, 12 Feb 2020 at 21:42, Zhu Zhu mailto:reed...@gmail.com>> wrote:
Cheers!
Thanks Gary and Yu for the great job as release managers.
And thanks to everyone whose contribution makes the release possible!

Thanks,
Zhu Zhu

Wyatt Chun mailto:wyattc...@gmail.com>> 于2020年2月12日周三 
下午9:36写道:
Sounds great. Congrats & Thanks!

On Wed, Feb 12, 2020 at 9:31 PM Yu Li mailto:car...@gmail.com>> wrote:
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.10.0, which is the latest major release.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html 


Please check out the release blog post for an overview of the improvements for 
this new major release:
https://flink.apache.org/news/2020/02/11/release-1.10.0.html 


The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12345845
 


We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Cheers,
Gary & Yu







Re: [DISCUSS][TABLE] Issue with package structure in the Table API

2020-02-13 Thread Timo Walther

Hi everyone,

thanks for bringing our offline discussion to the mailing list, Dawid. 
This is a very bad mistake that has been made in the past. In general, 
we should discourage putting the terms "java" and "scala" in package 
names as this has side effects on Scala imports.


I really don't like forcing users to put a "_root_" in their imports. It 
also happended to me a couple of times while developing Flink code that 
I was sitting in front of my IDE wondering why the code doesn't compile.


I'm also in favor of peforming this big change as early as possible. I'm 
sure Table API users are already quite annoyed by all the 
changes/refactorings happening. Changing the imports twice or three 
times is even more cumbersome.


Having to import just "org.apache.flink.table.api._" is a big usability 
plus for new users and especially interactive shell/notebook users.


Regards,
Timo


On 13.02.20 14:39, Dawid Wysakowicz wrote:

Hi devs,

I wanted to bring up a problem that we have in our package structure.

As a result of https://issues.apache.org/jira/browse/FLINK-13045 we
started advertising importing two packages in the scala API:
import org.apache.flink.table.api._
import org.apache.flink.table.api.scala._

The intention was that the first package (org.apache.flink.table.api)
would contain all api classes that are required to work with the unified
TableEnvironment. Such as TableEnvironment, Table, Session, Slide and
expressionDsl. The second package (org.apache.flink.table.api.scala._)
would've been an optional package that contain bridging conversions
between Table and DataStream/DataSet APIs including the more specific
StreamTableEnvironment and BatchTableEnvironment.

The part missing in the original plan was to move all expressions
implicit conversions to the org.apache.flink.table.api package. Without
that step users of pure table program (that do not use the
table-api-scala-bridge module) cannot use the Expression DSL. Therefore
we should try to move those expressions as soon as possible.

The problem with this approach is that it clashes with common imports of
classes from java.* and scala.* packages. Users are forced to write:

import org.apache.flink.table.api._
import org.apache.flink.table.api.scala_
import _root_.scala.collection.mutable.ArrayBuffer
import _root_.java.lang.Integer

Besides being cumbersome, it also messes up the macro based type
extraction (org.apache.flink.api.scala#createTypeInformation) for all
classes from scala.* packages. I don't fully understand the reasons for
it, but the createTypeInformation somehow drops the _root_ for
WeakTypeTags. So e.g. for a call:
createTypeInformation[_root_.scala.collection.mutable.ArrayBuffer] it
actually tries to construct a TypeInformation for
org.apache.flink.table.api.scala.collection.mutable.ArrayBuffer, which
obviously fails.



What I would suggest for a target solution is to have:

1. for users of unified Table API with Scala ExpressionDSL

import org.apache.flink.table.api._ (for TableEnvironment, Tumble etc.
and expressions)

2. for users of Table API with scala's bridging conversions

import org.apache.flink.table.api._ (for Tumble etc. and expressions)
import org.apache.flink.table.api.bridge.scala._ (for bridging
conversions and StreamTableEnvironment)

3. for users of unified Table API with Java ExpressionDSL

import org.apache.flink.table.api.* (for TableEnvironment, Tumble etc.)
import org.apache.flink.table.api.Expressions.* (for Expression dsl)

4. for users of Table API with java's bridging conversions

import org.apache.flink.table.api.* (for Tumble etc.)
import org.apache.flink.table.api.Expressions.* (for Expression dsl)
import org.apache.flink.table.api.bridge.java.*

To have that working we need to:
* move the scala expression DSL to org.apache.flink.table.api package in
table-api-scala module
* move all classes from org.apache.flink.table.api.scala and
org.apache.flink.table.api.java packages to
org.apache.flink.table.api.bridge.scala and
org.apache.flink.table.api.bridge.java accordingly and drop the former
packages

The biggest question I have is how do we want to perform that
transition. If we do it in one go we will break existing user programs
that uses classes from org.apache.flink.table.api.java/scala. Those
packages were present from day one of Table API. Nevertheless this would
be my preffered way to move forward as we annoy users only once, even if
one big time :(

Different option would be to make that transition gradually in 3 releases.
  1. In the first we introduce the
org.apache.flink.table.api.bridge.java/scala, and we have
StreamTableEnvironment etc. as well as expression DSL in both. We ask
users to migrate to the new package.
  2. We drop the org.apache.flink.table.api.java/scala and ask users to
import additionally org.apache.flink.table.api.* for expressions (this
is the same as we did in 1.9.0, the thing though it was extremely hard
to do it then)
  3. We finally move the expression DSL from
or

Re: [DISCUSS] Remove registration of TableSource/TableSink in Table Env and ConnectTableDescriptor

2020-02-17 Thread Timo Walther

Hi Kurt,

no there is no JIRA ticket yet. But in any case, I think it is better to 
have good testing infrastructure that abstracts source generation, sink 
generation, testing data etc. If we will introduce tableEnv.values() it 
will also not solve everything because time-based operations might need 
time attributes and so on.


Using DDL in tests should also be avoided because strings are even more 
difficult to maintain.


Regards,
Timo


On 08.02.20 04:29, Kurt Young wrote:

Hi Timo,

tableEnv.fromElements/values() sounds good, do we have a jira ticket to
track the issue?

Best,
Kurt


On Fri, Feb 7, 2020 at 10:56 PM Timo Walther  wrote:


Hi Kurt,

Dawid is currently working on making a tableEnv.fromElements/values()
kind of source possible in the future. We can use this to replace some
of the tests. Otherwise I guess we should come up with a better test
infrastructure to make defining source not necessary anymore.

Regards,
Timo


On 07.02.20 11:24, Kurt Young wrote:

Thanks all for your feedback, since no objection has been raised, I've
created
https://issues.apache.org/jira/browse/FLINK-15950 to track this issue.

Since this issue would require lots of tests adjustment before it really
happen,
it won't be done in a short time. Feel free to give feedback anytime here
or in jira
if you have other opinions.

Best,
Kurt


On Wed, Feb 5, 2020 at 8:26 PM Kurt Young  wrote:


Hi Zhenghua,

After removing TableSource::getTableSchema, during optimization, I could
imagine
the schema information might come from relational nodes such as

TableScan.


Best,
Kurt


On Wed, Feb 5, 2020 at 8:24 PM Kurt Young  wrote:


Hi Jingsong,

Yes current TableFactory is not ideal for users to use either. I think

we

should
also spend some time in 1.11 to improve the usability of

TableEnvironment

when
users trying to read or write something. Automatic scheme inference

would

be
one of them. Other from this, we also support convert a DataStream to
Table, which
can serve some flexible requirements to read or write data.

Best,
Kurt


On Wed, Feb 5, 2020 at 7:29 PM Zhenghua Gao  wrote:


+1 to remove these methods.

One concern about invocations of TableSource::getTableSchema:
By removing such methods, we can stop calling

TableSource::getTableSchema

in some place(such
as BatchTableEnvImpl/TableEnvironmentImpl#validateTableSource,
ConnectorCatalogTable, TableSourceQueryOperation).

But in other place we need field types and names of the table

source(such

as
BatchExecLookupJoinRule/StreamExecLookupJoinRule,
PushProjectIntoTableSourceScanRule,
CommonLookupJoin).  So how should we deal with this?

*Best Regards,*
*Zhenghua Gao*


On Wed, Feb 5, 2020 at 2:36 PM Kurt Young  wrote:


Hi all,

I'd like to bring up a discussion about removing registration of
TableSource and
TableSink in TableEnvironment as well as in ConnectTableDescriptor.

The

affected
method would be:

TableEnvironment::registerTableSource
TableEnvironment::fromTableSource
TableEnvironment::registerTableSink
ConnectTableDescriptor::registerTableSource
ConnectTableDescriptor::registerTableSink
ConnectTableDescriptor::registerTableSourceAndSink

(Most of them are already deprecated, except for
TableEnvironment::fromTableSource,
which was intended to deprecate but missed by accident).

FLIP-64 [1] already explained why we want to deprecate TableSource &
TableSink from
user's interface. In a short word, these interfaces should only read

&

write the physical
representation of the table, and they are not fitting well after we

already

introduced some
logical table fields such as computed column, watermarks.

Another reason is the exposure of registerTableSource in Table Env

just

make the whole
SQL protocol opposite. TableSource should be used as a reader of

table, it

should rely on
other metadata information held by framework, which eventually comes

from

DDL or
ConnectDescriptor. But if we register a TableSource to Table Env, we

have

no choice but
have to rely on TableSource::getTableSchema. It will make the design
obscure, sometimes
TableSource should trust the information comes from framework, but
sometimes it should
also generate its own schema information.

Furthermore, if the authority about schema information is not clear,

it

will make things much
more complicated if we want to improve the table api usability such

as

introducing automatic
schema inference in the near future.

Since this is an API break change, I've also included user mailing

list to

gather more feedbacks.

Best,
Kurt

[1]





https://cwiki.apache.org/confluence/display/FLINK/FLIP-64%3A+Support+for+Temporary+Objects+in+Table+module

















  1   2   3   4   5   6   7   8   9   10   >