Re: [NOTICE] Experimental Java 17 support now available on master

2023-06-18 Thread Jing Ge
Hi Kurt,

Thanks for your contribution. I am a little bit confused about the email
title, since your PR[1] is not merged into the master yet. I guess, with
"Experimental Java 17 support", you meant it is available on your branch
which is based on the master.

If I am not mistaken, there is no vote thread of FLIP 317 on ML. Would you
like to follow the standard process[2] defined by the Flink community?
Thanks!


Best regards,
Jing

[1] https://github.com/apache/flink/pull/22660
[2]
https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals

On Sun, Jun 18, 2023 at 1:18 AM Kurt Ostfeld 
wrote:

> I built the Flink master branch and tried running this simple Flink app
> that uses a Java record:
>
>
> https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java
>
> It fails with the normal exception that Kryo 2.x throws when you try to
> serialize a Java record. The full stack trace is here:
> https://pastebin.com/HGhGKUWt
>
> I tried removing this line:
>
>
> https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java#L36
>
> and that had no impact, I got the same error.
>
> In the other thread, you said that the plan was to use PojoSerializer to
> serialize records rather than Kryo. Currently, the Flink code bases uses
> Kryo 2.x by default for generic user data types, and that will fail when
> the data type is a record or contains records. Ultimately, if Flink wants
> to fully support Java records, it seems that it has to move off of Kryo
> 2.x. PojoSerializer is part of what is basically a custom serialization
> library internal to Flink that is an alternative to Kryo. That's one
> option: move off of Kryo to a Flink-internal serialization library. The
> other two options are upgrade to the new Kryo or use a different
> serialization library.
>
> The Kryo 5.5.0 upgrade PR I submitted (
> https://github.com/apache/flink/pull/22660) with FLIP 317 (
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0)
> works with records. The Flink app linked above that uses records works with
> the PR and that's what I posted to this mailing list a few weeks ago. I
> rebased the pull request on to the latest master branch and it's passing
> all tests. From my testing, it supports stateful upgrades, including
> checkpoints. If you can demonstrate a scenario where stateful upgrades
> error I can try to resolve that.


Re: [DISCUSS] FLIP-313 Add support of User Defined AsyncTableFunction

2023-06-18 Thread Aitozi
Hi all,
Sorry for the late reply, I have a discussion with Lincoln offline,
mainly about
the naming of the hints option. Thanks Lincoln for the valuable suggestions.

Let me answer the last email inline.

>For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
an example?

Sure, will give an example when adding the doc of async udtf and will
update the FLIP simultaneously

>For the name of this query hint, 'LATERAL' (include its internal options)
don't show any relevance to async, but I haven't thought of a suitable name
at the moment,

After some discussion with Lincoln, We prefer to choose one of the
`ASYNC_TABLE_FUNC` and `ASYNC_LATERAL`.
Besides, In my opinion the keyword `lateral`'s use scenario is wider than
the table function join, but in this case we only want to config
the async table function, So I'm a bit more lean to the `ASYNC_TABLE_FUNC`.
Looking forward to some inputs if you guys have
some better suggestion on the naming.

For the usage of the hints config option, I have updated the section
of ConfigOption, you can refer to the FLIP
for more details.

>Also, the terms 'correlate join' and 'lateral join' are not the same as in
the current joins page[1], so maybe it would be better if we unified them
into  'join table function'

Yes, we should unified to the 'join table function', updated.

Best,
Aitozi

Lincoln Lee  于2023年6月15日周四 09:15写道:

> Hi Aitozi,
>
> Thanks for your reply!  Gives sql users more flexibility to get
> asynchronous processing capabilities via lateral join table function +1 for
> this
>
> For `JavaAsyncTableFunc0` in flip, can you use a scenario like RPC call as
> an example?
>
> For the name of this query hint, 'LATERAL' (include its internal options)
> don't show any relevance to async, but I haven't thought of a suitable name
> at the moment,
> maybe we need to highlight the async keyword directly, we can also see if
> others have better candidates
>
> For the hint option "timeout = '180s'" should be "'timeout' = '180s'",
> seems a typo in the flip. And use upper case for all keywords in sql
> examples.
> Also, the terms 'correlate join' and 'lateral join' are not the same as in
> the current joins page[1], so maybe it would be better if we unified them
> into  'join table function'
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#table-function
>
> Best,
> Lincoln Lee
>
>
> Aitozi  于2023年6月14日周三 16:11写道:
>
> > Hi Lincoln
> >
> > Very thanks for your valuable question. I will try to answer your
> > questions inline.
> >
> > >Does the async udtf bring any additional benefits besides a
> > lighter implementation?
> >
> > IMO, async udtf is more than a lighter implementation. It can act as a
> > general way for sql users to use the async operator. And they don't have
> to
> > bind the async function with a table (a LookupTable), and they are not
> > forced to join on an equality join condition, and they can use it to do
> > more than enrich data.
> >
> > The async lookup join is more like a subset/specific usage of async udtf.
> > The specific version has more opportunity to be optimized (like push
> down)
> > is acceptable. Async table function should be categorized to used-defined
> > function.
> >
> > >Should users
> >
> > migrate to the lookup source when they encounter similar requirements or
> >
> > problems, or should we develop an additional set of similar mechanisms?
> >
> > As I clarified above, the lookup join is a specific usage of async udtf.
> So
> > it deserves more refined optimization like caching / retryable. But it
> may
> > not all
> >
> > suitable for the async udtf. As function, it can be deterministic/or
> > non-deterministic. So caching is not suitable, and we also do not have a
> > common cache for the udf now. So I think optimization like caching/retry
> > should be handed over to the function implementor.
> >
> > > the newly added query hint need a different name that
> > can be easier related to the lateral operation as the current join
> hints[5]
> > do.
> >
> >
> > What about using LATERAL?
> >
> > as below
> >
> > SELECT /*+ LATERAL('output-mode' = 'ordered', 'capacity' = '200',
> timeout =
> > '180s') */ a, c1, c2
> >
> > FROM T1
> >
> > LEFT JOIN lateral TABLE (async_split(b)) AS T(c1, c2) ON true
> >
> > >For the async func example, since the target scenario is an external io
> > operation, it's better to add the `close` method to actively release
> > resources as a good example for users
> >
> >
> > Make sense to me, will update the FLIP
> >
> > Best,
> >
> > Aitozi.
> >
> > Lincoln Lee  于2023年6月14日周三 14:24写道:
> >
> > > Hi Aitozi,
> > >
> > > Sorry for the lately reply here!  Supports async
> > udtf(`AsyncTableFunction`)
> > > directly in sql seems like an attractive feature, but there're two
> issues
> > > that need to be addressed before we can be sure to add it:
> > > 1. As mentioned in the flip[1], the current lookup function can already
> > > implement the re

Re: [NOTICE] Experimental Java 17 support now available on master

2023-06-18 Thread Kurt Ostfeld
I think there is some confusion:

Chesnay, not me, recently checked in changes into master so that Flink will 
build + test + run with experimental support for Java 17 but with Kryo 2.x 
as-is so this will error with Java records. Chesnay created this particular 
email thread related to this work.

I (Kurt), created a PR+FLIP several weeks ago for upgrading Kryo from 2.x to 
5.x, with full backward compatibility for existing savepoints/checkpoints, that 
enables Flink to run on Java 17 with support for Java records. This isn't 
merged into master. I haven't gotten much feedback on this.

I recently rebased the Kryo upgrade PR onto the master branch, whicch includes 
Chesnay commits. The PR branch was already running successfully on Java 17, 
Chesnay's changes enable Flink to build and run the CI test suite in Java 17 as 
well. However, without the Kryo upgrade, Flink isn't compatible with Java 
records.

I'd be happy to follow the standard process and do the the FLIP vote, but 
before this is ready for a vote, this PR needs review + testing by someone 
other than me. Specifically, I'd like someone to try to create a Flink 
application that tries to break the upgrade process: either confirm that 
everything works or demonstrate an error scenario. 

The Kryo PR code is passing all automated CI tests, which include several tests 
covering backwards compatibility scenarios. I also created this simple 
application https://github.com/kurtostfeld/flink-kryo-upgrade-demo to create 
state with Flink 1.17 and test the upgrade process. From what I can see it 
works, but this would definitely need more testing from people other than just 
me.



--- Original Message ---
On Sunday, June 18th, 2023 at 7:41 AM, Jing Ge  
wrote:


> 
> 
> Hi Kurt,
> 
> Thanks for your contribution. I am a little bit confused about the email
> title, since your PR[1] is not merged into the master yet. I guess, with
> "Experimental Java 17 support", you meant it is available on your branch
> which is based on the master.
> 
> If I am not mistaken, there is no vote thread of FLIP 317 on ML. Would you
> like to follow the standard process[2] defined by the Flink community?
> Thanks!
> 
> 
> Best regards,
> Jing
> 
> [1] https://github.com/apache/flink/pull/22660
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> 
> On Sun, Jun 18, 2023 at 1:18 AM Kurt Ostfeld kurtostf...@proton.me.invalid
> 
> wrote:
> 
> > I built the Flink master branch and tried running this simple Flink app
> > that uses a Java record:
> > 
> > https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java
> > 
> > It fails with the normal exception that Kryo 2.x throws when you try to
> > serialize a Java record. The full stack trace is here:
> > https://pastebin.com/HGhGKUWt
> > 
> > I tried removing this line:
> > 
> > https://github.com/kurtostfeld/flink-kryo-upgrade-demo/blob/main/flink-record-demo/src/main/java/demo/app/Main.java#L36
> > 
> > and that had no impact, I got the same error.
> > 
> > In the other thread, you said that the plan was to use PojoSerializer to
> > serialize records rather than Kryo. Currently, the Flink code bases uses
> > Kryo 2.x by default for generic user data types, and that will fail when
> > the data type is a record or contains records. Ultimately, if Flink wants
> > to fully support Java records, it seems that it has to move off of Kryo
> > 2.x. PojoSerializer is part of what is basically a custom serialization
> > library internal to Flink that is an alternative to Kryo. That's one
> > option: move off of Kryo to a Flink-internal serialization library. The
> > other two options are upgrade to the new Kryo or use a different
> > serialization library.
> > 
> > The Kryo 5.5.0 upgrade PR I submitted (
> > https://github.com/apache/flink/pull/22660) with FLIP 317 (
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-317%3A+Upgrade+Kryo+from+2.24.0+to+5.5.0)
> > works with records. The Flink app linked above that uses records works with
> > the PR and that's what I posted to this mailing list a few weeks ago. I
> > rebased the pull request on to the latest master branch and it's passing
> > all tests. From my testing, it supports stateful upgrades, including
> > checkpoints. If you can demonstrate a scenario where stateful upgrades
> > error I can try to resolve that.


[jira] [Created] (FLINK-32374) ExecNodeGraphInternalPlan#writeToFile should support TRUNCATE_EXISTING for overwriting

2023-06-18 Thread Jane Chan (Jira)
Jane Chan created FLINK-32374:
-

 Summary: ExecNodeGraphInternalPlan#writeToFile should support 
TRUNCATE_EXISTING for overwriting
 Key: FLINK-32374
 URL: https://issues.apache.org/jira/browse/FLINK-32374
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Planner
Affects Versions: 1.17.1, 1.16.2, 1.16.1, 1.17.0, 1.16.0, 1.18.0
Reporter: Jane Chan
 Fix For: 1.18.0






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-18 Thread Becket Qin
Hi John,

Completely agree with all you said.

Can we consider only dropping deprecated APIs in major releases across the
> board? I understand that Experimental and PublicEvolving APIs are by
> definition less stable, but it seems like this should be reflected in the
> required deprecation period alone. I.e. that we must keep them around for
> at least zero or one minor release, not that we can drop them in a minor or
> patch release.

Personally speaking, I would love to do this, for exactly the reason you
mentioned. However, I did not propose this due to the following reasons:

1. I am hesitating a little bit about changing the accepted FLIPs too soon.
2. More importantly, to avoid slowing down our development. At this point,
Flink still lacks some design / routines to support good API evolvability /
extensibility. Just like you said, it takes some time to be good at this.
In this case, my concern is that only removing Experimental /
PublicEvolving APIs in major version changes may result in too much
overhead and dramatically slow down the development of Flink. So, I was
thinking that we can start with the current status. Hopefully after we are
more comfortable with the maintenance overhead of deprecated APIs, we can
then have a stronger guarantee for Experimental / PublicEvolving APIs.

Thanks,

Jiangjie (Becket) Qin



On Sun, Jun 18, 2023 at 6:44 AM John Roesler  wrote:

> Hi Becket,
>
> Thanks for this FLIP! Having a deprecation process is really important. I
> understand some people’s concerns about the additional burden for project
> maintainers, but my personal experience with Kafka has been that it’s very
> liveable and that it’s well worth the benefit to users. In fact, users
> being able to confidently upgrade is also a benefit to maintainers, as we
> will get fewer questions from people stuck on very old versions.
>
> One question:
> Can we consider only dropping deprecated APIs in major releases across the
> board? I understand that Experimental and PublicEvolving APIs are by
> definition less stable, but it seems like this should be reflected in the
> required deprecation period alone. I.e. that we must keep them around for
> at least zero or one minor release, not that we can drop them in a minor or
> patch release.
>
> The advantage of forbidding the removal of any API in minor or patch
> releases is that users will get a strong guarantee that they can bump the
> minor or patch version and still be able to compile, or even just re-link
> and know that they won’t face “MethodDef” exceptions at run time. This is a
> binary guarantee: if we allow removing  even Experimental APIs outside of
> major releases, users can no longer confidently upgrade.
>
> Aside from that, I’d share my 2 cents on a couple of points:
> * I’d use the official Deprecated annotation instead of introducing our
> own flavor (Retired, etc), since Deprecated is well integrated into build
> tools and IDEs.
> * I wouldn’t worry about a demotion process in this FLIP; it seems
> orthogonal, and something that should probably be taken case-by-case
> anyway.
> * Aside from deprecation and removal, there have been some discussions
> about how to evolve APIs and behavior in compatible ways. This is somewhat
> of an art, and if folks haven’t wrestled with it before, it’ll take some
> time to become good at it. I feel like this topic should also be orthogonal
> to this FLIP, but FWIW, my suggestion would be to adopt a simple policy not
> to break existing user programs, and leave the “how” up to implementers and
> reviewers.
>
> Thanks again,
> John
>
> On Sat, Jun 17, 2023, at 11:03, Jing Ge wrote:
> > Hi All,
> >
> > The @Public -> @PublicEvolving proposed by Xintong is a great idea.
> > Especially, after he suggest @PublicRetired, i.e. @PublicEvolving --(2
> > minor release)--> @Public --> @deprecated --(1 major
> > release)--> @PublicRetired. It will provide a lot of flexibility without
> > breaking any rules we had. @Public APIs are allowed to change between
> major
> > releases. Changing annotations is acceptable and provides additional
> > tolerance i.e. user-friendliness, since the APIs themself are not
> changed.
> >
> > I had similar thoughts when I was facing those issues. I want to move one
> > step further and suggest introducing one more annotation @Retired.
> >
> > Not like the @PublicRetired which is a compromise of downgrading @Public
> to
> > @PublicEvolving. As I mentioned earlier in my reply, Java standard
> > @deprecated should be used in the early stage of the deprecation process
> > and doesn't really meet our requirement. Since Java does not allow us to
> > extend annotation, I think it would be feasible to have the new @Retired
> to
> > help us monitor and manage the deprecation process, house cleaning, etc.
> >
> > Some ideas could be(open for discussion):
> >
> > @Retired:
> >
> > 1. There must be a replacement with functionality compatibility before
> APIs
> > can be marked as @Retired, i.e. DISCUSS and

Re: [DISCUSS] FLIP-321: Introduce an API deprecation process

2023-06-18 Thread John Roesler
Hi Becket,

Thanks for the reply! I’d like to continue the conversation about compatibility 
outside of this FLIP thread, but for now, I can accept your decision. It’s 
certainly an improvement. 

Thanks again,
John

On Sun, Jun 18, 2023, at 21:42, Becket Qin wrote:
> Hi John,
>
> Completely agree with all you said.
>
> Can we consider only dropping deprecated APIs in major releases across the
>> board? I understand that Experimental and PublicEvolving APIs are by
>> definition less stable, but it seems like this should be reflected in the
>> required deprecation period alone. I.e. that we must keep them around for
>> at least zero or one minor release, not that we can drop them in a minor or
>> patch release.
>
> Personally speaking, I would love to do this, for exactly the reason you
> mentioned. However, I did not propose this due to the following reasons:
>
> 1. I am hesitating a little bit about changing the accepted FLIPs too soon.
> 2. More importantly, to avoid slowing down our development. At this point,
> Flink still lacks some design / routines to support good API evolvability /
> extensibility. Just like you said, it takes some time to be good at this.
> In this case, my concern is that only removing Experimental /
> PublicEvolving APIs in major version changes may result in too much
> overhead and dramatically slow down the development of Flink. So, I was
> thinking that we can start with the current status. Hopefully after we are
> more comfortable with the maintenance overhead of deprecated APIs, we can
> then have a stronger guarantee for Experimental / PublicEvolving APIs.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Sun, Jun 18, 2023 at 6:44 AM John Roesler  wrote:
>
>> Hi Becket,
>>
>> Thanks for this FLIP! Having a deprecation process is really important. I
>> understand some people’s concerns about the additional burden for project
>> maintainers, but my personal experience with Kafka has been that it’s very
>> liveable and that it’s well worth the benefit to users. In fact, users
>> being able to confidently upgrade is also a benefit to maintainers, as we
>> will get fewer questions from people stuck on very old versions.
>>
>> One question:
>> Can we consider only dropping deprecated APIs in major releases across the
>> board? I understand that Experimental and PublicEvolving APIs are by
>> definition less stable, but it seems like this should be reflected in the
>> required deprecation period alone. I.e. that we must keep them around for
>> at least zero or one minor release, not that we can drop them in a minor or
>> patch release.
>>
>> The advantage of forbidding the removal of any API in minor or patch
>> releases is that users will get a strong guarantee that they can bump the
>> minor or patch version and still be able to compile, or even just re-link
>> and know that they won’t face “MethodDef” exceptions at run time. This is a
>> binary guarantee: if we allow removing  even Experimental APIs outside of
>> major releases, users can no longer confidently upgrade.
>>
>> Aside from that, I’d share my 2 cents on a couple of points:
>> * I’d use the official Deprecated annotation instead of introducing our
>> own flavor (Retired, etc), since Deprecated is well integrated into build
>> tools and IDEs.
>> * I wouldn’t worry about a demotion process in this FLIP; it seems
>> orthogonal, and something that should probably be taken case-by-case
>> anyway.
>> * Aside from deprecation and removal, there have been some discussions
>> about how to evolve APIs and behavior in compatible ways. This is somewhat
>> of an art, and if folks haven’t wrestled with it before, it’ll take some
>> time to become good at it. I feel like this topic should also be orthogonal
>> to this FLIP, but FWIW, my suggestion would be to adopt a simple policy not
>> to break existing user programs, and leave the “how” up to implementers and
>> reviewers.
>>
>> Thanks again,
>> John
>>
>> On Sat, Jun 17, 2023, at 11:03, Jing Ge wrote:
>> > Hi All,
>> >
>> > The @Public -> @PublicEvolving proposed by Xintong is a great idea.
>> > Especially, after he suggest @PublicRetired, i.e. @PublicEvolving --(2
>> > minor release)--> @Public --> @deprecated --(1 major
>> > release)--> @PublicRetired. It will provide a lot of flexibility without
>> > breaking any rules we had. @Public APIs are allowed to change between
>> major
>> > releases. Changing annotations is acceptable and provides additional
>> > tolerance i.e. user-friendliness, since the APIs themself are not
>> changed.
>> >
>> > I had similar thoughts when I was facing those issues. I want to move one
>> > step further and suggest introducing one more annotation @Retired.
>> >
>> > Not like the @PublicRetired which is a compromise of downgrading @Public
>> to
>> > @PublicEvolving. As I mentioned earlier in my reply, Java standard
>> > @deprecated should be used in the early stage of the deprecation process
>> > and doesn't really meet our requirement. Since Java 

Re: [VOTE] FLIP-287: Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-18 Thread Zhu Zhu
+1 (binding)

Thanks,
Zhu

Tzu-Li (Gordon) Tai  于2023年6月17日周六 11:32写道:
>
> +1 (binding)
>
> On Fri, Jun 16, 2023, 09:53 Jing Ge  wrote:
>
> > +1(binding)
> >
> > Best Regards,
> > Jing
> >
> > On Fri, Jun 16, 2023 at 10:10 AM Lijie Wang 
> > wrote:
> >
> > > +1 (binding)
> > >
> > > Thanks for driving it, Joao.
> > >
> > > Best,
> > > Lijie
> > >
> > > Joao Boto  于2023年6月16日周五 15:53写道:
> > >
> > > > Hi all,
> > > >
> > > > Thank you to everyone for the feedback on FLIP-287[1]. Based on the
> > > > discussion thread [2], we have come to a consensus on the design and
> > are
> > > > ready to take a vote to contribute this to Flink.
> > > >
> > > > I'd like to start a vote for it. The vote will be open for at least 72
> > > > hours(excluding weekends, unless there is an objection or an
> > insufficient
> > > > number of votes.
> > > >
> > > > [1]
> > > >
> > >
> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
> > > > [2]https://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
> > > >
> > > >
> > > > Best,
> > > > Joao Boto
> > > >
> > >
> >


Re:Re: [VOTE] FLIP-287: Extend Sink#InitContext to expose TypeSerializer, ObjectReuse and JobID

2023-06-18 Thread Yuepeng Pan
+1 (non-binding)

Thanks,
Roc






在 2023-06-19 10:55:05,"Zhu Zhu"  写道:
>+1 (binding)
>
>Thanks,
>Zhu
>
>Tzu-Li (Gordon) Tai  于2023年6月17日周六 11:32写道:
>>
>> +1 (binding)
>>
>> On Fri, Jun 16, 2023, 09:53 Jing Ge  wrote:
>>
>> > +1(binding)
>> >
>> > Best Regards,
>> > Jing
>> >
>> > On Fri, Jun 16, 2023 at 10:10 AM Lijie Wang 
>> > wrote:
>> >
>> > > +1 (binding)
>> > >
>> > > Thanks for driving it, Joao.
>> > >
>> > > Best,
>> > > Lijie
>> > >
>> > > Joao Boto  于2023年6月16日周五 15:53写道:
>> > >
>> > > > Hi all,
>> > > >
>> > > > Thank you to everyone for the feedback on FLIP-287[1]. Based on the
>> > > > discussion thread [2], we have come to a consensus on the design and
>> > are
>> > > > ready to take a vote to contribute this to Flink.
>> > > >
>> > > > I'd like to start a vote for it. The vote will be open for at least 72
>> > > > hours(excluding weekends, unless there is an objection or an
>> > insufficient
>> > > > number of votes.
>> > > >
>> > > > [1]
>> > > >
>> > >
>> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=240880853
>> > > > [2]https://lists.apache.org/thread/wb3myhqsdz81h08ygwx057mkn1hc3s8f
>> > > >
>> > > >
>> > > > Best,
>> > > > Joao Boto
>> > > >
>> > >
>> >


Re: [DISCUSS] FLIP-316: Introduce SQL Driver

2023-06-18 Thread Paul Lam
Hi Shengkai,

Sorry for my late reply. It took me some time to update the FLIP.

In the latest FLIP design, SQL Driver is placed in flink-sql-gateway module. 
PTAL.

The FLIP does not cover details about the K8s file distribution, but its 
general usage would
be very much the same as YARN setups. We could make follow-up discussions in 
the jira
tickets.

Best,
Paul Lam

> 2023年6月12日 15:29,Shengkai Fang  写道:
> 
> 
> > If it’s the case, I’m good with introducing a new module and making SQL 
> > Driver
> > an internal class and accepts JSON plans only.
> 
> I rethink this again and again. I think it's better to move the SqlDriver 
> into the sql-gateway module because the sql client relies on the sql-gateway 
> to submit the sql and the sql-gateway has the ability to generate the 
> ExecNodeGraph now. +1 to support accepting JSON plans only.
> 
> * Upload configuration through command line parameter
> 
> ExecNodeGraph only contains the job's information but it doesn't contain the 
> checkpoint dir, checkpoint interval, execution mode and so on. So I think we 
> should also upload the configuration.
> 
> * KubernetesClusterDescripter and  KubernetesApplicationClusterEntrypoint are 
> responsible for the jar upload/download
> 
> +1 for the change.
> 
> Could you update the FLIP about the current discussion? 
> 
> Best,
> Shengkai
> 
> 
> 
> 
> 
> 
> Yang Wang mailto:wangyang0...@apache.org>> 
> 于2023年6月12日周一 11:41写道:
> Sorry for the late reply. I am in favor of introducing such a built-in
> resource localization mechanism
> based on Flink FileSystem. Then FLINK-28915[1] could be the second step
> which will download
> the jars and dependencies to the JobManager/TaskManager local directory
> before working.
> 
> The first step could be done in another ticket in Flink. Or some external
> Flink jobs management system
> could also take care of this.
> 
> [1]. https://issues.apache.org/jira/browse/FLINK-28915 
> 
> 
> Best,
> Yang
> 
> Paul Lam mailto:paullin3...@gmail.com>> 于2023年6月9日周五 
> 17:39写道:
> 
> > Hi Mason,
> >
> > I get your point. I'm increasingly feeling the need to introduce a
> > built-in
> > file distribution mechanism for flink-kubernetes module, just like Spark
> > does with `spark.kubernetes.file.upload.path` [1].
> >
> > I’m assuming the workflow is as follows:
> >
> > - KubernetesClusterDescripter uploads all local resources to a remote
> >   storage via Flink filesystem (skips if the resources are already remote).
> > - KubernetesApplicationClusterEntrypoint downloads the resources
> >   and put them in the classpath during startup.
> >
> > I wouldn't mind splitting it into another FLIP to ensure that everything is
> > done correctly.
> >
> > cc'ed @Yang to gather more opinions.
> >
> > [1]
> > https://spark.apache.org/docs/latest/running-on-kubernetes.html#dependency-management
> >  
> > 
> >
> > Best,
> > Paul Lam
> >
> > 2023年6月8日 12:15,Mason Chen  > > 写道:
> >
> > Hi Paul,
> >
> > Thanks for your response!
> >
> > I agree that utilizing SQL Drivers in Java applications is equally
> > important
> >
> > as employing them in SQL Gateway. WRT init containers, I think most
> > users use them just as a workaround. For example, wget a jar from the
> > maven repo.
> >
> > We could implement the functionality in SQL Driver in a more graceful
> > way and the flink-supported filesystem approach seems to be a
> > good choice.
> >
> >
> > My main point is: can we solve the problem with a design agnostic of SQL
> > and Stream API? I mentioned a use case where this ability is useful for
> > Java or Stream API applications. Maybe this is even a non-goal to your FLIP
> > since you are focusing on the driver entrypoint.
> >
> > Jark mentioned some optimizations:
> >
> > This allows SQLGateway to leverage some metadata caching and UDF JAR
> > caching for better compiling performance.
> >
> > It would be great to see this even outside the SQLGateway (i.e. UDF JAR
> > caching).
> >
> > Best,
> > Mason
> >
> > On Wed, Jun 7, 2023 at 2:26 AM Shengkai Fang  > > wrote:
> >
> > Hi. Paul.  Thanks for your update and the update makes me understand the
> > design much better.
> >
> > But I still have some questions about the FLIP.
> >
> > For SQL Gateway, only DMLs need to be delegated to the SQL server
> > Driver. I would think about the details and update the FLIP. Do you have
> >
> > some
> >
> > ideas already?
> >
> >
> > If the applicaiton mode can not support library mode, I think we should
> > only execute INSERT INTO and UPDATE/ DELETE statement in the application
> > mode. AFAIK, we can not support ANALYZE TABLE and CALL PROCEDURE
> > statements. The ANALYZE TABLE syntax need to register the statistic to the
> > catalog after job finishes and the CALL PROCEDURE statement doesn't
> > generate the ExecNodeGraph.
> >
> > * 

Re: [DISCUSS] FLIP-324: Introduce Runtime Filter for Flink Batch Jobs

2023-06-18 Thread Lijie Wang
Hi Stefan,

Thanks for your feedback. Let me briefly summarize the optimization points
you mentioned above (Please correct me if I'm wrong):

1. Build an extra hash table for deduplication before building the bloom
filter.
2. Use the two-phase approach to build the bloom filter(first local, then
OR-combine).
3. Use blocked bloom filters to improve the cache efficiency.

For the above 3 points, I have the following questions or opinions:

For point 1, it seems that building a hash table also requires traversing
all build side data, and the overhead seems to be the same as building a
bloom filter directly? In addition, the hash table will take up more space
when the amount of data is large, which is why we choose to use bloom
filter instead of hash table.

For point 2, I think it's a good idea to use the two-phase approach to
build the bloom filter. But rather than directly broadcasting the local
bloom filter to the probe side, I prefer to introduce a global node for the
OR-combine(like two-phase-agg[1]), then broadcast the combined bloom filter
to the probe side. The latter can reduce the amount of data transferred by
the network. I will change the FLIP like this.

For point 3, I think it's a nice optimization, but I prefer to put it to
the future improvements. There is already an implementation of bloom filter
in flink, we can simply reuse it. Introducing a new bloom filter
implementation introduces some complexity  (we need to implement it, test
it, etc), and is not the focus of this FLIP.

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#local-global-aggregation

Best,
Lijie

Stefan Richter  于2023年6月16日周五 16:45写道:

> Hi,
>
> Thanks for the proposal of this feature! I have a question about the
> filter build and a some suggestions for potential improvements. First, I
> wonder why you suggest to run the filter builder as separate operator with
> parallelism 1. I’d suggest to integrate the filter distributed build with
> the hash table build phase as follows:
>
> 1. Build the hash table completely in each subtask.
> 2. The keyset of the hash table is giving us a precise NDV count for every
> subtask.
> 3. Build a filter from the subtask hash table. For low cardinality tables,
> I’d go with the suggested optimization of IN-filter.
> 4. Each build subtask transfers the local bloom filter to all probe
> operators.
> 5. On the probe operator we can either probe against the individual
> filters, or we OR-combine all subtask filters into aggregated bloom filter.
>
> I’m suggesting this because building inserting into a (larger) bloom
> filter can be costly, especially once the filter exceeds cache sizes and is
> therefor better parallelized. First inserting into the hash table also
> deduplicates the keys and we avoid inserting records twice into the bloom
> filter. If we want to improve cache efficiency for the build of larger
> filters, we could structure them as blocked bloom filters, where the filter
> is separated into blocks and all bits of one key go only into one block.
> That allows us to apply software managed buffering to first group keys that
> go into the same partition (ideally fitting into cache) and then bulk load
> partitions once we collected enough keys for one round of loading.
>
> Best,
> Stefan
>
>
>   
> Stefan Richter
> Principal Engineer II
>
> Follow us:  <
> https://www.confluent.io/blog?utm_source=footer&utm_medium=email&utm_campaign=ch.email-signature_type.community_content.blog>
> 
>
>
>
> > On 15. Jun 2023, at 13:35, Lijie Wang  wrote:
> >
> > Hi,  Benchao and Aitozi,
> >
> > Thanks for your feedback about this FLIP.
> >
> > @Benchao
> >
> >>> I think it would be reasonable to also support "pipeline shuffle" if
> > possible.
> > As I said above, runtime filter can work well with all shuffle mode,
> > including pipeline shuffle.
> >
> >>> if the RuntimeFIlterBuilder could be done quickly than RuntimeFilter
> > operator, it can still filter out additional data afterwards.
> > I think the main purpose of runtime filter is to reduce the shuffle data
> > and the data arriving at join. Although eagerly running the large
> > table side can process datas in advance, most of the data may be
> > irrelevant, causing huge shuffle overhead and slowing the join. In
> > addition, if the join is a hash-join, the probe side of the hash-join
> also
> > needs to wait for its build side to complete, so the large table side is
> > likely to be back-pressed.
> > In addition, I don't tend to add too many configuration options in the
> > first version, which may make it more difficult to use (users need to
> > understand a lot of internal implementation details). Maybe it could be a
> > future improvement (if it's worthwhile)?
> >
> >
> > @Aitozi
> >
> >>> IMO, In the current implementation two source table operators will be
> > executed simultaneously.
> > The example in FLIP uses blocking shuffle(I will add this p