Re: [ANNOUNCE] New PMC member: Yuan Mei

2022-03-14 Thread Zhilong Hong
Congratulations, Yuan!

Best,
Zhilong

On Mon, Mar 14, 2022 at 7:22 PM Konstantin Knauf  wrote:

> Congratulations, Yuan!
>
> On Mon, Mar 14, 2022 at 11:29 AM Jing Zhang  wrote:
>
> > Congratulations, Yuan!
> >
> > Best,
> > Jing Zhang
> >
> > Jing Ge  于2022年3月14日周一 18:15写道:
> >
> > > Congrats! Very well deserved!
> > >
> > > Best,
> > > Jing
> > >
> > > On Mon, Mar 14, 2022 at 10:34 AM Piotr Nowojski 
> > > wrote:
> > >
> > > > Congratulations :)
> > > >
> > > > pon., 14 mar 2022 o 09:59 Yun Tang  napisał(a):
> > > >
> > > > > Congratulations, Yuan!
> > > > >
> > > > > Best,
> > > > > Yun Tang
> > > > > 
> > > > > From: Zakelly Lan 
> > > > > Sent: Monday, March 14, 2022 16:55
> > > > > To: dev@flink.apache.org 
> > > > > Subject: Re: [ANNOUNCE] New PMC member: Yuan Mei
> > > > >
> > > > > Congratulations, Yuan!
> > > > >
> > > > > Best,
> > > > > Zakelly
> > > > >
> > > > > On Mon, Mar 14, 2022 at 4:49 PM Johannes Moser 
> > > > wrote:
> > > > >
> > > > > > Congrats Yuan.
> > > > > >
> > > > > > > On 14.03.2022, at 09:45, Arvid Heise  wrote:
> > > > > > >
> > > > > > > Congratulations and well deserved!
> > > > > > >
> > > > > > > On Mon, Mar 14, 2022 at 9:30 AM Matthias Pohl <
> map...@apache.org
> > >
> > > > > wrote:
> > > > > > >
> > > > > > >> Congratulations, Yuan.
> > > > > > >>
> > > > > > >> On Mon, Mar 14, 2022 at 9:25 AM Shuo Cheng <
> njucs...@gmail.com>
> > > > > wrote:
> > > > > > >>
> > > > > > >>> Congratulations, Yuan!
> > > > > > >>>
> > > > > > >>> On Mon, Mar 14, 2022 at 4:22 PM Anton Kalashnikov <
> > > > > kaa@yandex.com>
> > > > > > >>> wrote:
> > > > > > >>>
> > > > > >  Congratulations, Yuan!
> > > > > > 
> > > > > >  --
> > > > > > 
> > > > > >  Best regards,
> > > > > >  Anton Kalashnikov
> > > > > > 
> > > > > >  14.03.2022 09:13, Leonard Xu пишет:
> > > > > > > Congratulations Yuan!
> > > > > > >
> > > > > > > Best,
> > > > > > > Leonard
> > > > > > >
> > > > > > >> 2022年3月14日 下午4:09,Yangze Guo  写道:
> > > > > > >>
> > > > > > >> Congratulations!
> > > > > > >>
> > > > > > >> Best,
> > > > > > >> Yangze Guo
> > > > > > >>
> > > > > > >> On Mon, Mar 14, 2022 at 4:08 PM Martijn Visser <
> > > > > >  martijnvis...@apache.org> wrote:
> > > > > > >>> Congratulations Yuan!
> > > > > > >>>
> > > > > > >>> On Mon, 14 Mar 2022 at 09:02, Yu Li 
> > > wrote:
> > > > > > >>>
> > > > > >  Hi all!
> > > > > > 
> > > > > >  I'm very happy to announce that Yuan Mei has joined the
> > > Flink
> > > > > PMC!
> > > > > > 
> > > > > >  Yuan is helping the community a lot with creating and
> > > > validating
> > > > > >  releases,
> > > > > >  contributing to FLIP discussions and good code
> > contributions
> > > > to
> > > > > > >> the
> > > > > >  state backend and related components.
> > > > > > 
> > > > > >  Congratulations and welcome, Yuan!
> > > > > > 
> > > > > >  Best Regards,
> > > > > >  Yu (On behalf of the Apache Flink PMC)
> > > > > > 
> > > > > >  --
> > > > > > 
> > > > > >  Best regards,
> > > > > >  Anton Kalashnikov
> > > > > > 
> > > > > > 
> > > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>
>
> --
>
> Konstantin Knauf
>
> https://twitter.com/snntrable
>
> https://github.com/knaufk
>


Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk

2021-12-10 Thread Zhilong Hong
Congratulations, Ingo!

Best,
Zhilong

On Wed, Dec 8, 2021 at 10:18 PM godfrey he  wrote:

> Congratulations, Ingo!
>
> Best,
> Godfrey
>
> Roman Khachatryan  于2021年12月6日周一 下午6:07写道:
> >
> > Congratulations, Ingo!
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Dec 6, 2021 at 11:05 AM Yang Wang  wrote:
> > >
> > > Congratulations, Ingo!
> > >
> > > Best,
> > > Yang
> > >
> > > Sergey Nuyanzin  于2021年12月6日周一 下午3:35写道:
> > >
> > > > Congratulations, Ingo!
> > > >
> > > > On Mon, Dec 6, 2021 at 7:32 AM Leonard Xu  wrote:
> > > >
> > > > > Congratulations, Ingo! Well Deserved.
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > >
> > > > > > 2021年12月3日 下午11:24,Ingo Bürk  写道:
> > > > > >
> > > > > > Thank you everyone for the warm welcome!
> > > > > >
> > > > > >
> > > > > > Best
> > > > > > Ingo
> > > > > >
> > > > > > On Fri, Dec 3, 2021 at 11:47 AM Ryan Skraba
> > > >  > > > > >
> > > > > > wrote:
> > > > > >
> > > > > >> Congratulations Ingo!
> > > > > >>
> > > > > >> On Fri, Dec 3, 2021 at 8:17 AM Yun Tang 
> wrote:
> > > > > >>
> > > > > >>> Congratulations, Ingo!
> > > > > >>>
> > > > > >>> Best
> > > > > >>> Yun Tang
> > > > > >>> 
> > > > > >>> From: Yuepeng Pan 
> > > > > >>> Sent: Friday, December 3, 2021 14:14
> > > > > >>> To: dev@flink.apache.org 
> > > > > >>> Cc: Ingo Bürk 
> > > > > >>> Subject: Re:Re: [ANNOUNCE] New Apache Flink Committer - Ingo
> Bürk
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> Congratulations, Ingo!
> > > > > >>>
> > > > > >>>
> > > > > >>> Best,
> > > > > >>> Yuepeng Pan
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>>
> > > > > >>> At 2021-12-03 13:47:38, "Yun Gao"  >
> > > > > wrote:
> > > > >  Congratulations Ingo!
> > > > > 
> > > > >  Best,
> > > > >  Yun
> > > > > 
> > > > > 
> > > > > 
> --
> > > > >  From:刘建刚 
> > > > >  Send Time:2021 Dec. 3 (Fri.) 11:52
> > > > >  To:dev 
> > > > >  Cc:"Ingo Bürk" 
> > > > >  Subject:Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk
> > > > > 
> > > > >  Congratulations!
> > > > > 
> > > > >  Best,
> > > > >  Liu Jiangang
> > > > > 
> > > > >  Till Rohrmann  于2021年12月2日周四 下午11:24写道:
> > > > > 
> > > > > > Hi everyone,
> > > > > >
> > > > > > On behalf of the PMC, I'm very happy to announce Ingo Bürk
> as a new
> > > > > >>> Flink
> > > > > > committer.
> > > > > >
> > > > > > Ingo has started contributing to Flink since the beginning
> of this
> > > > > >>> year. He
> > > > > > worked mostly on SQL components. He has authored many PRs and
> > > > helped
> > > > > >>> review
> > > > > > a lot of other PRs in this area. He actively reported issues
> and
> > > > > >> helped
> > > > > >>> our
> > > > > > users on the MLs. His most notable contributions were
> Support SQL
> > > > > 2016
> > > > > >>> JSON
> > > > > > functions in Flink SQL (FLIP-90), Register sources/sinks in
> Table
> > > > API
> > > > > > (FLIP-129) and various other contributions in the SQL area.
> > > > Moreover,
> > > > > >>> he is
> > > > > > one of the few people in our community who actually
> understands
> > > > > >> Flink's
> > > > > > frontend.
> > > > > >
> > > > > > Please join me in congratulating Ingo for becoming a Flink
> > > > committer!
> > > > > >
> > > > > > Cheers,
> > > > > > Till
> > > > > >
> > > > > >>>
> > > > > >>
> > > > >
> > > > >
> > > >
> > > > --
> > > > Best regards,
> > > > Sergey
> > > >
>


Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl

2021-12-10 Thread Zhilong Hong
Congratulations, Matthias!

Best,
Zhilong

On Wed, Dec 8, 2021 at 10:18 PM godfrey he  wrote:

> Congratulations, Matthias!
>
> Best,
> Godfrey
>
> Roman Khachatryan  于2021年12月6日周一 下午6:07写道:
> >
> > Congratulations, Matthias!
> >
> > Regards,
> > Roman
> >
> >
> > On Mon, Dec 6, 2021 at 11:04 AM Yang Wang  wrote:
> > >
> > > Congratulations, Matthias!
> > >
> > > Best,
> > > Yang
> > >
> > > Sergey Nuyanzin  于2021年12月6日周一 下午3:35写道:
> > >
> > > > Congratulations, Matthias!
> > > >
> > > > On Mon, Dec 6, 2021 at 7:33 AM Leonard Xu  wrote:
> > > >
> > > > > Congratulations Matthias!
> > > > >
> > > > > Best,
> > > > > Leonard
> > > > > > 2021年12月3日 下午11:23,Matthias Pohl  写道:
> > > > > >
> > > > > > Thank you! I'm looking forward to continue working with you.
> > > > > >
> > > > > > On Fri, Dec 3, 2021 at 7:29 AM Jingsong Li <
> jingsongl...@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > >> Congratulations, Matthias!
> > > > > >>
> > > > > >> On Fri, Dec 3, 2021 at 2:13 PM Yuepeng Pan 
> wrote:
> > > > > >>>
> > > > > >>> Congratulations Matthias!
> > > > > >>>
> > > > > >>> Best,Yuepeng Pan.
> > > > > >>> 在 2021-12-03 13:47:20,"Yun Gao" 
> 写道:
> > > > >  Congratulations Matthias!
> > > > > 
> > > > >  Best,
> > > > >  Yun
> > > > > 
> > > > > 
> > > > > 
> --
> > > > >  From:Jing Zhang 
> > > > >  Send Time:2021 Dec. 3 (Fri.) 13:45
> > > > >  To:dev 
> > > > >  Cc:Matthias Pohl 
> > > > >  Subject:Re: [ANNOUNCE] New Apache Flink Committer - Matthias
> Pohl
> > > > > 
> > > > >  Congratulations, Matthias!
> > > > > 
> > > > >  刘建刚  于2021年12月3日周五 11:51写道:
> > > > > 
> > > > > > Congratulations!
> > > > > >
> > > > > > Best,
> > > > > > Liu Jiangang
> > > > > >
> > > > > > Till Rohrmann  于2021年12月2日周四
> 下午11:28写道:
> > > > > >
> > > > > >> Hi everyone,
> > > > > >>
> > > > > >> On behalf of the PMC, I'm very happy to announce Matthias
> Pohl as
> > > > a
> > > > > >> new
> > > > > >> Flink committer.
> > > > > >>
> > > > > >> Matthias has worked on Flink since August last year. He
> helped
> > > > > >> review a
> > > > > > ton
> > > > > >> of PRs. He worked on a variety of things but most notably
> the
> > > > > >> tracking
> > > > > > and
> > > > > >> reporting of concurrent exceptions, fixing HA bugs and
> deprecating
> > > > > >> and
> > > > > >> removing our Mesos support. He actively reports issues
> helping
> > > > > >> Flink to
> > > > > >> improve and he is actively engaged in Flink's MLs.
> > > > > >>
> > > > > >> Please join me in congratulating Matthias for becoming a
> Flink
> > > > > >> committer!
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > > >
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> --
> > > > > >> Best, Jingsong Lee
> > > > >
> > > > >
> > > >
> > > > --
> > > > Best regards,
> > > > Sergey
> > > >
>


Re: [DISCUSS] FLIP-210: Change logging level dynamically at runtime

2022-01-11 Thread Zhilong Hong
Thank you for proposing this improvement, Wenhao. Changing the logging
level dynamically at runtime is very useful when users are trying to debug
their jobs. They can set the logging level to DEBUG and find out more
details in the logs.

1. I'm wondering if we could add a REST API to query the current logging
level? This API will be useful for users to get to know the current status
of the logging level, especially for those who have their own job
management platform.

2. Would it be better if we add a field to specify the target
JobManager/TaskManager for the logconfig API? Currently, it seems the
modified logging level will be applied to all components in the cluster. If
we change the logging level to DEBUG, the overall size of logs may increase
rapidly, especially for large-scale clusters. It may become a heavy burden
for the disk usage. Adding a field to specify the target could minimize the
impact. Users can only change the logging level for the TaskManager they
are focusing on. Furthermore, if users want to change the logging level for
all components, the target field can be set to "ALL".

On Tue, Jan 11, 2022 at 12:27 AM Konstantin Knauf  wrote:

> Thank you for starting the discussion. Being able to change the logging
> level at runtime is very valuable in my experience.
>
> Instead of introducing our own API (and eventually even persistence), could
> we just periodically reload the log4j or logback configuration from the
> environment/filesystem? I only quickly googled the topic and [1,2] suggest
> that this might be possible?
>
> [1] https://stackoverflow.com/a/16216956/6422562?
> [2] https://logback.qos.ch/manual/configuration.html#autoScan
>
>
>
>
>
> On Mon, Jan 10, 2022 at 5:10 PM Wenhao Ji  wrote:
>
> > Hi everyone,
> >
> > Hope you enjoyed the Holiday Season.
> >
> > I would like to start the discussion on the improvement purpose
> > FLIP-210 [1] which aims to provide a way to change log levels at
> > runtime to simplify issues and bugs detection as reported in the
> > ticket FLINK-16478 [2].
> > Firstly, thanks Xingxing Di and xiaodao for their previous effort. The
> > FLIP I drafted is largely influenced by their previous designs [3][4].
> > Although we have reached some agreements under the jira comments about
> > the scope of this feature, we still have the following questions
> > listed below ready to be discussed in this thread.
> >
> > ## Question 1
> >
> > > Creating as custom DSL and implementing it for several logging backend
> > sounds like quite a maintenance burden. Extensions to the DSL, and
> > supported backends, could become quite an effort. (by Chesnay Schepler)
> >
> > I tried to design the API of the logging backend to stay away from the
> > details of implementations but I did not find any slf4j-specific API
> > that is available to change the log level of a logger. So what I did
> > is to introduce another kind of abstraction on top of the slf4j /
> > log4j / logback so that we will not depend on the logging provider's
> > api directly. It will be convenient for us to adopt any other logging
> > providers. Please see the "Logging Abstraction" section.
> >
> > ## Question 2
> >
> > > Do we know whether other systems support this kind of feature? If yes,
> > how do they solve it for different logging backends? (by Till Rohrmann)
> >
> > I investigated several Java frameworks including Spark, Storm, and
> > Spring Boot. Here is what I found.
> > Spark & Storm directly depend on the log4j implementations, which
> > means they do not support any other slf4j implementation at all. They
> > simply call the log4j api directly. (see SparkContext.scala#L381 [5],
> > Utils.scala#L2443 [6] in Spark, and LogConfigManager.java#L144 [7] in
> > Storm). They are pretty different from what Flink provides.
> > However, I found Spring Boot has implemented what we are interested
> > in. Just as Flink, Spring boot also supports many slf4j
> > implementations. Users are not limited to log4j. They have the ability
> > to declare different logging frameworks by importing certain
> > dependencies. After that spring will decide the activated one by
> > scanning its classpath and context. (see LoggingSystem.java#L164 [8]
> > and LoggersEndpoint.java#L99 [9])
> >
> > ## Question 3
> >
> > Besides the questions raised in the jira comments, I also find another
> > thing that has not been discussed. Considering this feature as an MVP,
> > do we need to introduce a HighAvailabilityService to store the log
> > settings so that they can be synced to newly-joined task managers and
> > also job manager followers for consistency? This issue is included in
> > the "Limitations" section in the flip.
> >
> > Finally, thanks for your time for joining this discussion and
> > reviewing this FLIP. I would appreciate it if you could have any
> > comments or suggestions on this.
> >
> >
> > [1]:
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-210%3A+Change+logging+level+dynamically+at+runtime
> > [2]: 

Re: I want to contribute to Apache Flink

2022-01-17 Thread Zhilong Hong
Hi, Chengyun:

Thanks for your passion for contributing to the Flink community. You don't
need a contributor permission to contribute your code to Flink. Find an
open issue you are interested in at
http://issues.apache.org/jira/browse/FLINK and comment with your ideas. It
would be a good start for issues with the label "starter". You can refer to
https://flink.apache.org/contributing/how-to-contribute.html for more
information about how to contribute.

Cheers,
Zhilong

On Mon, Jan 17, 2022 at 9:59 PM chengyunzhang6 
wrote:

> Hi,
> I want to contribute to Apache Flink. Would you please give me the
> contributor permission? My JIRA full name is Chengyun Zhang.


Re: Re: Change of focus

2022-02-28 Thread Zhilong Hong
Thank you for everything, Till! I've learned a lot from you.

Good luck with your new adventure and the next chapter!

Best,
Zhilong

On Tue, Mar 1, 2022 at 12:08 PM Yun Tang  wrote:

> Thanks a lot for your efforts and kindness of mentoring contributors in
> Apache Flink community, Till!
>
> Good luck with your new adventure in your new life.
>
>
> Best
> Yun Tang
>
> 
> From: Yuan Mei 
> Sent: Tuesday, March 1, 2022 11:00
> To: dev 
> Subject: Re: Re: Change of focus
>
> Thanks Till for everything you've done for the community!
> Good luck with your new adventure and best wishes to your new life!
>
> Best Regards,
> Yuan
>
> On Tue, Mar 1, 2022 at 10:35 AM Zhu Zhu  wrote:
>
> > Thank you for all the efforts and good luck for the new adventure, Till!
> >
> > Thanks,
> > Zhu
> >
> > Terry  于2022年3月1日周二 10:26写道:
> >
> > > Thanks a lot for your efforts! Good Luck!
> > >
> > > Jiangang Liu  于2022年3月1日周二 10:18写道:
> > >
> > > > Thanks for the efforts and help in flink, Till. Good luck!
> > > >
> > > > Best
> > > > Liu Jiangang
> > > >
> > > > Lijie Wang  于2022年3月1日周二 09:53写道:
> > > >
> > > > > Thanks for all your efforts Till. Good luck !
> > > > >
> > > > > Best,
> > > > > Lijie
> > > > >
> > > > > Yun Gao  于2022年3月1日周二 01:15写道:
> > > > >
> > > > > > Very thanks Till for all the efforts! Good luck for the next
> > chapter~
> > > > > >
> > > > > > Best,
> > > > > > Yun
> > > > > >
> > > > > >
> --
> > > > > > Sender:Piotr Nowojski
> > > > > > Date:2022/02/28 22:10:46
> > > > > > Recipient:dev
> > > > > > Theme:Re: Change of focus
> > > > > >
> > > > > > Good luck Till and thanks for all of your efforts.
> > > > > >
> > > > > > Best,
> > > > > > Piotrek
> > > > > >
> > > > > > pon., 28 lut 2022 o 15:06 Aitozi 
> > napisał(a):
> > > > > >
> > > > > > > Good luck with the next chapter, will miss you :)
> > > > > > >
> > > > > > > Best,
> > > > > > > Aitozi
> > > > > > >
> > > > > > > Jark Wu  于2022年2月28日周一 21:28写道:
> > > > > > >
> > > > > > > > Thank you Till for every things. It's great to work with you.
> > > Good
> > > > > > luck!
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Jark
> > > > > > > >
> > > > > > > > On Mon, 28 Feb 2022 at 21:26, Márton Balassi <
> > > > > balassi.mar...@gmail.com
> > > > > > >
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thank you, Till. Good luck with the next chapter. :-)
> > > > > > > > >
> > > > > > > > > On Mon, Feb 28, 2022 at 1:49 PM Flavio Pompermaier <
> > > > > > > pomperma...@okkam.it
> > > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Good luck for your new adventure Till!
> > > > > > > > > >
> > > > > > > > > > On Mon, Feb 28, 2022 at 12:00 PM Till Rohrmann <
> > > > > > trohrm...@apache.org
> > > > > > > >
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi everyone,
> > > > > > > > > > >
> > > > > > > > > > > I wanted to let you know that I will be less active in
> > the
> > > > > > > community
> > > > > > > > > > > because I’ve decided to start a new chapter in my life.
> > > > Hence,
> > > > > > > please
> > > > > > > > > > don’t
> > > > > > > > > > > wonder if I might no longer be very responsive on mails
> > and
> > > > > JIRA
> > > > > > > > > issues.
> > > > > > > > > > >
> > > > > > > > > > > It is great being part of such a great community with
> so
> > > many
> > > > > > > amazing
> > > > > > > > > > > people. Over the past 7,5 years, I’ve learned a lot
> > thanks
> > > to
> > > > > you
> > > > > > > and
> > > > > > > > > > > together we have shaped how people think about stream
> > > > > processing
> > > > > > > > > > nowadays.
> > > > > > > > > > > This is something we can be very proud of. I am sure
> that
> > > the
> > > > > > > > community
> > > > > > > > > > > will continue innovating and setting the pace for what
> is
> > > > > > possible
> > > > > > > > with
> > > > > > > > > > > real time processing. I wish you all godspeed!
> > > > > > > > > > >
> > > > > > > > > > > Cheers,
> > > > > > > > > > > Till
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Martijn Visser

2022-03-04 Thread Zhilong Hong
Congratulations, Martin! Well deserved!

Best,
Zhilong

On Sat, Mar 5, 2022 at 1:09 AM Piotr Nowojski  wrote:

> Congratulations!
>
> Piotrek
>
> pt., 4 mar 2022 o 16:05 Aitozi  napisał(a):
>
> > Congratulations Martjin!
> >
> > Best,
> > Aitozi
> >
> > Martijn Visser  于2022年3月4日周五 17:10写道:
> >
> > > Thank you all!
> > >
> > > On Fri, 4 Mar 2022 at 10:00, Niels Basjes  wrote:
> > >
> > > > Congratulations Martjin!
> > > >
> > > > On Fri, Mar 4, 2022 at 9:43 AM Johannes Moser 
> > wrote:
> > > >
> > > > > Congratulations Martijn,
> > > > >
> > > > > Well deserved.
> > > > >
> > > > > > On 03.03.2022, at 16:49, Robert Metzger 
> > wrote:
> > > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > On behalf of the PMC, I'm very happy to announce Martijn Visser
> as
> > a
> > > > new
> > > > > > Flink committer.
> > > > > >
> > > > > > Martijn is a very active Flink community member, driving a lot of
> > > > efforts
> > > > > > on the dev@flink mailing list. He also pushes projects such as
> > > > replacing
> > > > > > Google Analytics with Matomo, so that we can generate our web
> > > analytics
> > > > > > within the Apache Software Foundation.
> > > > > >
> > > > > > Please join me in congratulating Martijn for becoming a Flink
> > > > committer!
> > > > > >
> > > > > > Cheers,
> > > > > > Robert
> > > > >
> > > > >
> > > >
> > > > --
> > > > Best regards / Met vriendelijke groeten,
> > > >
> > > > Niels Basjes
> > > >
> > >
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - David Morávek

2022-03-04 Thread Zhilong Hong
Congratulations, David!

Best,
Zhilong

On Sat, Mar 5, 2022 at 1:09 AM Piotr Nowojski  wrote:

> Congratulations :)
>
> pt., 4 mar 2022 o 16:04 Aitozi  napisał(a):
>
> > Congratulations David!
> >
> > Ingo Bürk  于2022年3月4日周五 22:56写道:
> >
> > > Congrats, David!
> > >
> > > On 04.03.22 12:34, Robert Metzger wrote:
> > > > Hi everyone,
> > > >
> > > > On behalf of the PMC, I'm very happy to announce David Morávek as a
> new
> > > > Flink committer.
> > > >
> > > > His first contributions to Flink date back to 2019. He has been
> > > > increasingly active with reviews and driving major initiatives in the
> > > > community. David brings valuable experience from being a committer in
> > the
> > > > Apache Beam project to Flink.
> > > >
> > > >
> > > > Please join me in congratulating David for becoming a Flink
> committer!
> > > >
> > > > Cheers,
> > > > Robert
> > > >
> > >
> >
>


[DISCUSS] FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs

2021-02-02 Thread Zhilong Hong
Hello, everyone:

I would like to start the discussion about FLINK-21110: Optimize Scheduler 
Performance for Large-Scale Jobs [1].

According to the result of scheduler benchmarks we implemented in FLINK-20612 
[2], the bottleneck of deploying and running a large-scale job in Flink is 
mainly focused on the following procedures:

Procedure   Time complexity
Initializing ExecutionGraph
O(N^2)
Building DefaultExecutionTopology
O(N^2)
Initializing PipelinedRegionSchedulingStrategy
O(N^2)
Scheduling downstream tasks when a task finishes
O(N^2)
Calculating tasks to restart when a failover occurs
O(N^2)
Releasing result partitions
O(N^2)

These procedures are all related to the complexity of the topology in the 
ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
the upstream Intermediate ResultPartitions are connected to all downstream 
ExecutionVertices. The computation complexity of building and traversing all 
these edges will be O(N^2).

As for memory usage, currently we use ExecutionEdges to store the information 
of connections. For the all-to-all distribution type, there are O(N^2) 
ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
of them are both 10k. Furthermore, they are connected with all-to-all edges. It 
takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.

In most large-scale jobs, there will be more than two vertices with large 
parallelisms, and they would cost a lot of time and memory to deploy the job.

As we can see, for two JobVertices connected with the all-to-all distribution 
type, all IntermediateResultPartitions produced by the upstream 
ExecutionVertices are isomorphic, which means that the downstream 
ExecutionVertices they connected are exactly the same. The downstream 
ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
upstream ResultPartitions they connect are the same, too.

Since every JobEdge has exactly one distribution type, we can divide the 
vertices and result partitions into groups according to the distribution type 
of the JobEdge.

For the all-to-all distribution type, since all downstream vertices are 
isomorphic, they belong to a single group, and all the upstream result 
partitions are connected to this group. Vice versa, all the upstream result 
partitions also belong to a single group, and all the downstream vertices are 
connected to this group. In the past, when we wanted to iterate all the 
downstream vertices, we needed to loop over them n times, which leads to the 
complexity of O(N^2). Now since all upstream result partitions are connected to 
one downstream group, we just need to loop over them once, with the complexity 
of O(N).

For the pointwise distribution type, because each result partition is connected 
to different downstream vertices, they should belong to different groups. Vice 
versa, all the vertices belong to different groups. Since one result partition 
group is connected to one vertex group pointwisely, the computation complexity 
of looping over them is still O(N).

After we group the result partitions and vertices, ExecutionEdge is no longer 
needed. For the test job we mentioned above, the optimization can effectively 
reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in 
our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 
10k parallelism).

The detailed design doc with illustrations is located at [3]. Please find more 
details in the links below.

Looking forward to your feedback.

[1] https://issues.apache.org/jira/browse/FLINK-21110
[2] https://issues.apache.org/jira/browse/FLINK-20612
[3] 
https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing



Re: [ANNOUNCE] New PMC member: Xintong Song

2021-06-20 Thread Zhilong Hong
Congratulations, Xintong!

Dawid Wysakowicz  于2021年6月16日周三 下午5:23写道:

> Hi all!
>
> I'm very happy to announce that Xintong Song has joined the Flink PMC!
>
> Congratulations and welcome Xintong!
>
> Best,
> Dawid
>


Re: [ANNOUNCE] New PMC member: Guowei Ma

2021-07-06 Thread Zhilong Hong
Congratulations, Guowei!

Best,
Zhilong

On Tue, Jul 6, 2021 at 10:01 PM Kurt Young  wrote:

> Hi all!
>
> I'm very happy to announce that Guowei Ma has joined the Flink PMC!
>
> Congratulations and welcome Guowei!
>
> Best,
> Kurt
>


Re: I want to contribute to Apache Flink

2021-07-06 Thread Zhilong Hong
Hi, Pang!

Thank you for your enthusiasm for contribution. You don't need any
permission to contribute codes to Apache Flink. Feel free to find the
issues that you are interested in JIRA (
https://issues.apache.org/jira/projects/FLINK/issues). The issues with the
label "starter" is a good choice to start with. You can comment with your
idea, and request committers to assign the ticket to you.

Best,
Zhilong

On Wed, Jul 7, 2021 at 11:56 AM pang pan  wrote:

> Hi Guys,
>
> I want to contribute to Apache Flink.
> Would you please give me the permission as a contributor?
> My JIRA ID is pangpan.
>


Re: [ANNOUNCE] New Apache Flink Committer - Yuan Mei

2021-07-07 Thread Zhilong Hong
Congratulations, Yuan!

Best,
Zhilong

On Thu, Jul 8, 2021 at 1:22 AM Yu Li  wrote:

> Hi all,
>
> On behalf of the PMC, I’m very happy to announce Yuan Mei as a new Flink
> committer.
>
> Yuan has been an active contributor for more than two years, with code
> contributions on multiple components including kafka connectors,
> checkpointing, state backends, etc. Besides, she has been actively involved
> in community activities such as helping manage releases, discussing
> questions on dev@list, supporting users and giving talks at conferences.
>
> Please join me in congratulating Yuan for becoming a Flink committer!
>
> Cheers,
> Yu
>


Re: [ANNOUNCE] New Apache Flink Committer - Leonard Xu

2021-11-14 Thread Zhilong Hong
Congratulations, Leonard!

On Mon, Nov 15, 2021 at 10:13 AM Qingsheng Ren  wrote:

> Congratulations Leonard!
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Nov 12, 2021, 12:12 PM +0800, Jark Wu , wrote:
> > Hi everyone,
> >
> > On behalf of the PMC, I'm very happy to announce Leonard Xu as a new
> Flink
> > committer.
> >
> > Leonard has been a very active contributor for more than two year,
> authored
> > 150+ PRs and reviewed many PRs which is quite outstanding.
> > Leonard mainly works on Flink SQL parts and drives several important
> FLIPs,
> > e.g. FLIP-132 (temporal table join) and FLIP-162 (correct time
> behaviors).
> > He is also the maintainer of flink-cdc-connectors[1] project which helps
> a
> > lot for users building a real-time data warehouse and data lake.
> >
> > Please join me in congratulating Leonard for becoming a Flink committer!
> >
> > Cheers,
> > Jark Wu
> >
> > [1]: https://github.com/ververica/flink-cdc-connectors
>


Re: [ANNOUNCE] New Apache Flink Committer - Yangze Guo

2021-11-14 Thread Zhilong Hong
Congratulations, Yangze!

On Mon, Nov 15, 2021 at 10:13 AM Qingsheng Ren  wrote:

> Congratulations Yangze!
>
> --
> Best Regards,
>
> Qingsheng Ren
> Email: renqs...@gmail.com
> On Nov 12, 2021, 10:11 AM +0800, Xintong Song ,
> wrote:
> > Hi everyone,
> >
> > On behalf of the PMC, I'm very happy to announce Yangze Guo as a new
> Flink
> > committer.
> >
> > Yangze has been consistently contributing to this project for almost 3
> > years. His contributions are mainly in the resource management and
> > deployment areas, represented by the fine-grained resource management and
> > external resource framework. In addition to feature works, he's also
> active
> > in miscellaneous contributions, including PR reviews, document
> enhancement,
> > mailing list services and meetup/FF talks.
> >
> > Please join me in congratulating Yangze Guo for becoming a Flink
> committer!
> >
> > Thank you~
> >
> > Xintong Song
>


Re: [ANNOUNCE] New Apache Flink Committer - Fabian Paul

2021-11-15 Thread Zhilong Hong
Congratulations, Fabian!

Best regards,
Zhilong Hong

On Tue, Nov 16, 2021 at 10:19 AM Yangze Guo  wrote:

> Congrats & well deserved!
>
> Best,
> Yangze Guo
>
> On Tue, Nov 16, 2021 at 10:18 AM Jing Zhang  wrote:
> >
> > Congratulations Fabian!
> >
> > Best,
> > Jing Zhang
> >
> > Yuepeng Pan  于2021年11月16日周二 上午10:16写道:
> >
> > > Congratulations Fabian!
> > >
> > > Best,
> > > Yuepeng Pan
> > >
> > > At 2021-11-15 21:17:13, "Arvid Heise"  wrote:
> > > >Hi everyone,
> > > >
> > > >On behalf of the PMC, I'm very happy to announce Fabian Paul as a new
> > > Flink
> > > >committer.
> > > >
> > > >Fabian Paul has been actively improving the connector ecosystem by
> > > >migrating Kafka and ElasticSearch to the Sink interface and is
> currently
> > > >driving FLIP-191 [1] to tackle the sink compaction issue. While he is
> > > >active on the project (authored 70 PRs and reviewed 60), it's also
> worth
> > > >highlighting that he has also been guiding external efforts, such as
> the
> > > >DeltaLake Flink connector or the Pinot sink in Bahir.
> > > >
> > > >Please join me in congratulating Fabian for becoming a Flink
> committer!
> > > >
> > > >Best,
> > > >
> > > >Arvid
> > > >
> > > >[1]
> > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction
> > >
>


Re: [ANNOUNCE] New Apache Flink Committer - Jing Zhang

2021-11-15 Thread Zhilong Hong
Congratulations, Jing!

Best regards,
Zhilong Hong

On Mon, Nov 15, 2021 at 9:41 PM Martijn Visser 
wrote:

> Congratulations Jing!
>
> On Mon, 15 Nov 2021 at 14:39, Timo Walther  wrote:
>
> > Hi everyone,
> >
> > On behalf of the PMC, I'm very happy to announce Jing Zhang as a new
> > Flink committer.
> >
> > Jing has been very active in the Flink community esp. in the Table/SQL
> > area for quite some time: 81 PRs [1] in total and is also active on
> > answering questions on the user mailing list. She is currently
> > contributing a lot around the new windowing table-valued functions [2].
> >
> > Please join me in congratulating Jing Zhang for becoming a Flink
> committer!
> >
> > Thanks,
> > Timo
> >
> > [1] https://github.com/apache/flink/pulls/beyond1920
> > [2] https://issues.apache.org/jira/browse/FLINK-23997
> >
>


Re: [ANNOUNCE] New Apache Flink Committer - Yingjie Cao

2021-11-16 Thread Zhilong Hong
Congratulations, Yiingjie!


Best regards,
Zhilong

On Wed, Nov 17, 2021 at 2:13 PM Yuepeng Pan  wrote:

> Congratulations !
>
>
> Best,
> Yuepeng Pan.
>
>
> At 2021-11-17 12:55:29, "Guowei Ma"  wrote:
> >Hi everyone,
> >
> >On behalf of the PMC, I'm very happy to announce Yingjie Cao as a new
> Flink
> >committer.
> >
> >Yingjie has submitted 88 PRs since he joined the Flink community for more
> >than 2 years. In general, his main contributions are concentrated in
> >Flink's Shuffle. Yingjie has done a lot of work in promoting the
> >performance and stability of TM Blocking Shuffle[1][2];In order to allow
> >Flink to use External/Remote Shuffle Service in batch production
> scenarios,
> >he also improves the Flink Shuffle architecture in 1.14 [3]. At the same
> >time, he has also done some related work in Streaming Shuffle, such as
> >Buffer Management[4] , Non-Blocking Network Output and some important bug
> >fixes.
> >
> >Please join me in congratulating Yingjie for becoming a Flink committer!
> >
> >[1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink
> >[2] https://flink.apache.org/2021/10/26/sort-shuffle-part1.html
> >[3]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework
> >[4] https://issues.apache.org/jira/browse/FLINK-16428
> >
> >Best,
> >Guowei
>


Re: [ANNOUNCE] New Apache Flink Committers: Qingsheng Ren, Shengkai Fang

2022-06-20 Thread Zhilong Hong
Congratulations, Qingsheng and ShengKai!

Best,
Zhilong.

On Mon, Jun 20, 2022 at 4:00 PM Lijie Wang  wrote:

> Congratulations, Qingsheng and ShengKai.
>
> Best,
> Lijie
>
> Paul Lam  于2022年6月20日周一 15:58写道:
>
> > Congrats, Qingsheng and Shengkai!
> >
> > Best,
> > Paul Lam
> >
> > > 2022年6月20日 15:57,Martijn Visser  写道:
> > >
> > > Congratulations to both of you, this is very much deserved!
> > >
> > > Op ma 20 jun. 2022 om 09:53 schreef Jark Wu :
> > >
> > >> Hi everyone,
> > >>
> > >> On behalf of the PMC, I'm very happy to announce two new Flink
> > committers:
> > >> Qingsheng Ren and Shengkai Fang.
> > >>
> > >> Qingsheng is the core contributor and maintainer of the Kafka
> connector.
> > >> He continuously improved the existing connectors, debugged many
> > connector
> > >> testability issues, and worked on the connector testing framework.
> > >> Recently,
> > >> he is driving the work of FLIP-221 (caching lookup connector), which
> is
> > >> crucial for SQL connectors.
> > >>
> > >> Shengkai has been continuously contributing to the Flink project for
> two
> > >> years.
> > >> He mainly works on Flink SQL parts and drives several important FLIPs,
> > >> e.g. FLIP-149 (upsert-kafka), FLIP-163 (SQL CLI Improvements),
> > >> FLIP-91 (SQL Gateway), and FLIP-223 (HiveServer2 Endpoint).
> > >> He is very active and helps many users on the mailing list.
> > >>
> > >> Please join me in welcoming them as committers!
> > >>
> > >> Cheers,
> > >> Jark Wu
> > >>
> >
> >
>


[jira] [Created] (FLINK-14065) Log metric name when the metric fails on registration/unregistration

2019-09-11 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-14065:


 Summary: Log metric name when the metric fails on 
registration/unregistration
 Key: FLINK-14065
 URL: https://issues.apache.org/jira/browse/FLINK-14065
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Metrics
Affects Versions: 1.9.0
Reporter: Zhilong Hong
 Fix For: 1.10.0


When MetricGroup registers Metrics in MetricRegistryImpl, sometimes the 
registration fails due to exceptions. However, currently it only logs {{"Error 
while registering metric" }}with no more information, which is inconvenient for 
users to troubleshoot which metric fails and why it fails.

Also, the warning log in registration and unregistration are both "{{Error 
while registering metric}}". This will lead users to confusion (although users 
can locate the correct place according to the call stack).

So I propose to log metric name when the metrics fails on 
registration/unregistration.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (FLINK-31945) Table of contents in the blogs of the project website is missing some titles

2023-04-26 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-31945:


 Summary: Table of contents in the blogs of the project website is 
missing some titles
 Key: FLINK-31945
 URL: https://issues.apache.org/jira/browse/FLINK-31945
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Zhilong Hong


The ToC in the blog pages of the project website doesn't have all the section 
titles. The section titles of the first level is missing. 

Solution: Add the following configuration item in config.toml.
{noformat}
[markup.tableOfContents]
  startLevel = 0{noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-32051) Fix broken documenation links in Flink blogs

2023-05-10 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-32051:


 Summary: Fix broken documenation links in Flink blogs
 Key: FLINK-32051
 URL: https://issues.apache.org/jira/browse/FLINK-32051
 Project: Flink
  Issue Type: Improvement
  Components: Project Website
Reporter: Zhilong Hong


Currently, the links to the documentations in the blogs are broken. We need to 
add a slash(/) at the end of the param {{DocsBaseUrl}} in config.toml like this:

 
{noformat}
[params]
  DocsBaseUrl = "//nightlies.apache.org/flink/"
{noformat}
Also, the links in this 
[post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/]
 is not rendered correctly. We need to add a newline after the {{{}{}}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-20547) Batch job fails due to the exception in network stack

2020-12-09 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-20547:


 Summary: Batch job fails due to the exception in network stack
 Key: FLINK-20547
 URL: https://issues.apache.org/jira/browse/FLINK-20547
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Attachments: inconsistent.tar.gz

I run a simple batch job with only two job vertices: a source and a sink. The 
parallelisms of them are both 8000. They are connected via all-to-all blocking 
edges. During the running of sink tasks, an exception raises:

 
{code:java}
2020-12-09 18:43:48,981 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Sink: Sink 1 
(1595/8000) (08bd4214d6e0dc144e9654f1faaa3b28) switched from RUNNING to FAILED 
on [masked container name] @ [masked address] (dataPort=47872).
java.io.IOException: java.lang.IllegalStateException: Inconsistent 
availability: expected true
at 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:232)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.getNextBuffer(RecoveredInputChannel.java:165)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at java.lang.Thread.run(Thread.java:834) ~[?:1.8.0_102]
Caused by: java.lang.IllegalStateException: Inconsistent availability: expected 
true
at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkConsistentAvailability(LocalBufferPool.java:434)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:564)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:509)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.tryRedistributeBuffers(NetworkBufferPool.java:438)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:166)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at 
org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:131)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]
at

[jira] [Created] (FLINK-20612) Add benchmarks for scheduler

2020-12-15 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-20612:


 Summary: Add benchmarks for scheduler
 Key: FLINK-20612
 URL: https://issues.apache.org/jira/browse/FLINK-20612
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong


With Flink 1.12, we failed to run large-scale jobs on our cluster. When we were 
trying to run the jobs, we met the exceptions like out of heap memory, 
taskmanager heartbeat timeout, and etc. We increased the size of heap memory 
and extended the heartbeat timeout, the job still failed. After the 
troubleshooting, we found that there are some performance bottlenecks in 
jobmaster. These bottlenecks are highly related to the complexity of the 
topology.

We implemented several benchmarks on these bottlenecks based on 
flink-benchmark. The topology of the benchmarks is a simple graph, which 
consists only two vertices: one source vertex and one sink vertex. They are 
both connected with all-to-all blocking edges. The parallelisms of the vertices 
are both 8k. The execution mode is batch. The results of the benchmarks are 
illustrated below:

Table 1: The result of benchmarks on bottlenecks in the jobmaster
| |*Time spent*|
|Build topology|19970.44 ms|
|Init scheduling strategy|41668.338 ms|
|Deploy tasks|15102.850 ms|
|Calculate failover region to restart|12080.271 ms|

We'd like to propose the benchmarks for procedures in the runtime module. There 
are three main benefits:
 # They help us to understand the current status of task deployment performance 
and locate where the bottleneck is.
 # We can use the benchmarks to evaluate the optimization in the future.
 # As we run the benchmarks daily, they will help us to trace how the 
performance changes and locate the commit that introduces the performance 
regression if there is any.

In the first version of the benchmarks, we mainly focus on the procedures we 
mentioned above. The methods that is corresponding to the procedures are:
 # Building topology: {{ExecutionGraph#attachJobGraph}}
 # Initializing scheduling strategies: 
{{PipelinedRegionSchedulingStrategy#init}}
 # Deploying tasks: {{Execution#deploy}}
 # Calculating failover regions: 
{{RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart}}

The topology of benchmarks consist two vertices: source -> sink. They are 
connected with all-to-all edges. The result partition type ({{PIPELINED}} and 
{{BLOCKING}}) should be considered separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-27287) FileExecutionGraphInfoStoreTest unstable with "Could not start rest endpoint on any port in port range 8081"

2022-04-18 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-27287:


 Summary: FileExecutionGraphInfoStoreTest unstable with "Could not 
start rest endpoint on any port in port range 8081"
 Key: FLINK-27287
 URL: https://issues.apache.org/jira/browse/FLINK-27287
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.15.0
Reporter: Zhilong Hong
 Fix For: 1.16.0, 1.15.1


In our CI we met the exception below in {{FileExecutionGraphInfoStoreTest}} and 
{{MemoryExecutionGraphInfoStoreITCase}}:

{code:java}
org.apache.flink.util.FlinkException: Could not create the 
DispatcherResourceManagerComponent.
at 
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:285)
at 
org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils$PersistingMiniCluster.createDispatcherResourceManagerComponents(ExecutionGraphInfoStoreTestUtils.java:227)
at 
org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:489)
at 
org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:433)
at 
org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStoreTest.testPutSuspendedJobOnClusterShutdown(FileExecutionGraphInfoStoreTest.java:328)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at org.junit.runner.JUnitCore.run(JUnitCore.java:115)
at 
org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42)
at 
org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80)
at 
org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at 
org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at 
org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at 
org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at 
org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at 
org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformPro

[jira] [Created] (FLINK-25612) Update the outdated illustration of ExecutionState in the documentation

2022-01-11 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-25612:


 Summary: Update the outdated illustration of ExecutionState in the 
documentation
 Key: FLINK-25612
 URL: https://issues.apache.org/jira/browse/FLINK-25612
 Project: Flink
  Issue Type: Technical Debt
  Components: Documentation
Affects Versions: 1.14.2, 1.13.5, 1.15.0
Reporter: Zhilong Hong
 Fix For: 1.15.0, 1.13.6, 1.14.3
 Attachments: current-illustration-2.jpg, new-illustration.jpg

Currently, the illustration of {{ExecutionState}} located in the page "Jobs and 
Scheduling" 
([https://nightlies.apache.org/flink/flink-docs-master/docs/internals/job_scheduling/])
 is outdated. It doesn't involve the INITIALIZING state, which is introduced in 
FLINK-17102.

 

Current illustration:

!current-illustration-2.jpg|width=400!

New illustration:

!new-illustration.jpg|width=400!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (FLINK-21110) Optimize Scheduler Performance for Large-Scale Jobs

2021-01-24 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21110:


 Summary: Optimize Scheduler Performance for Large-Scale Jobs
 Key: FLINK-21110
 URL: https://issues.apache.org/jira/browse/FLINK-21110
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0
 Attachments: Illustration of Group.jpg

According to the result of scheduler benchmarks we implemented in 
[FLINK-20612|https://issues.apache.org/jira/browse/FLINK-20612], the bottleneck 
of deploying and running a large-scale job in Flink is mainly focused on the 
following procedures:
|Procedure|Time complexity|
|Initializing ExecutionGraph|O(N^2^)|
|Building DefaultExecutionTopology|O(N^2^)|
|Initializing PipelinedRegionSchedulingStrategy|O(N^2^)|
|Scheduling downstream tasks when a task finishes|O(N^2^)|
|Calculating tasks to restart when a failover occurs|O(N^2^)|
|Releasing result partitions|O(N^3^)|

These procedures are all related to the complexity of the topology in the 
ExecutionGraph. Between two vertices connected with the all-to-all edges, all 
the upstream Intermediate ResultPartitions are connected to all downstream 
ExecutionVertices. The computation complexity of building and traversing all 
these edges will be O(N^2^). 

As for memory usage, currently we use ExecutionEdges to store the information 
of connections. For the all-to-all distribution type, there are O(N^2^) 
ExecutionEdges. We test a simple job with only two vertices. The parallelisms 
of them are both 10k. Furthermore, they are connected with all-to-all edges. It 
takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges.

In most large-scale jobs, there will be more than two vertices with large 
parallelisms, and they would cost a lot of time and memory to deploy the job.

As we can see, for two JobVertices connected with the all-to-all distribution 
type, all IntermediateResultPartitions produced by the upstream 
ExecutionVertices are isomorphic, which means that the downstream 
ExecutionVertices they connected are exactly the same. The downstream 
ExecutionVertices belonging to the same JobVertex are also isomorphic, as the 
upstream ResultPartitions they connect are the same, too.

Since every JobEdge has exactly one distribution type, we can divide the 
vertices and result partitions into groups according to the distribution type 
of the JobEdge. 

For the all-to-all distribution type, since all downstream vertices are 
isomorphic, they belong to a single group, and all the upstream result 
partitions are connected to this group. Vice versa, all the upstream result 
partitions also belong to a single group, and all the downstream vertices are 
connected to this group. In the past, when we wanted to iterate all the 
downstream vertices, we needed to loop over them n times, which leads to the 
complexity of O(N^2^). Now since all upstream result partitions are connected 
to one downstream group, we just need to loop over them once, with the 
complexity of O(N).

For the pointwise distribution type, because each result partition is connected 
to different downstream vertices, they should belong to different groups. Vice 
versa, all the vertices belong to different groups. Since one result partition 
group is connected to one vertex group pointwisely, the computation complexity 
of looping over them is still O(N).

!Illustration of Group.jpg!

After we group the result partitions and vertices, ExecutionEdge is no longer 
needed. For the test job we mention above, the optimization can effectively 
reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in 
our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 
10k parallelism).

The detailed design doc will be attached once finished.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21201) Creating BoundedBlockingSubpartition blocks TaskManager’s main thread

2021-01-29 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21201:


 Summary: Creating BoundedBlockingSubpartition blocks TaskManager’s 
main thread
 Key: FLINK-21201
 URL: https://issues.apache.org/jira/browse/FLINK-21201
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.12.1
Reporter: Zhilong Hong
 Attachments: jobmanager.log.tar.gz, taskmanager.log.tar.gz

When we are trying to run batch jobs with 8k parallelism, it takes a long time 
to deploy the vertices. After the investigation, we find that creating 
BoundedBlockingSubpartition blocks TaskManager’s main thread during the 
procedure of {{submitTask}}. 

When JobMaster invokes {{submitTask}} and sends an RPC call to the TaskManager, 
the TaskManager will receive the RPC call and execute the {{submitTask}} method 
in its main thread. In the {{submitTask}} method, the TaskExecutor will create 
a Task instance and try to start it. During the creation, the TaskExecutor will 
create the ResultPartition and its ResultSubpartitions. 

For the batch job, the type of ResultSubpartitions is the 
BoundedBlockingSubpartition with the FileChannelBoundedData. The 
BoundedBlockingSubpartition will create a file on the local disk, which is an 
IO operation and could take a long time. 

In our test, it would take at most 28 seconds to create 8k 
BoundedBlockingSubpartitions. This procedure blocks the main thread of the 
TaskManager, and would lead to heartbeat timeout and slow task deploying. In my 
opinion, the IO operation should be executed with IOExecutor rather than the 
main thread. 

The log of JobManager and TaskManager is attached below. A typical task is 
Source 0: #898.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21326) Optimize building topology when initializing ExecutionGraph

2021-02-08 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21326:


 Summary: Optimize building topology when initializing 
ExecutionGraph
 Key: FLINK-21326
 URL: https://issues.apache.org/jira/browse/FLINK-21326
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0


The main idea of optimizing the procedure of building topology is to put all 
the vertices that consumed the same result partitions into one group, and put 
all the result partitions that have the same consumer vertices into one 
consumer group. The corresponding data structure is {{ConsumedPartitionGroup}} 
and {{ConsumerVertexGroup}}. {{EdgeManager}} is used to store the relationship 
between the groups. The procedure of creating {{ExecutionEdge}} is replaced 
with building {{EdgeManager}}.

With these improvements, the complexity of building topology in ExecutionGraph 
decreases from O(N^2) to O(N). 

Furthermore, {{ExecutionEdge}} and all its related calls are replaced with 
{{ConsumedPartitionGroup}} and {{ConsumerVertexGroup}}.

The detailed design doc is located at: 
https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21328) Optimize the initialization of DefaultExecutionTopology

2021-02-08 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21328:


 Summary: Optimize the initialization of DefaultExecutionTopology
 Key: FLINK-21328
 URL: https://issues.apache.org/jira/browse/FLINK-21328
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0


Based on FLINK-21326, the {{consumedResults}} in {{DefaultExecutionVertex}} and 
{{consumers}} in {{DefaultResultPartition}} can be replaced with 
{{ConsumedPartitionGroup}} and {{ConsumerVertexGroup}} in {{EdgeManager}}.
 # The method {{DefaultExecutionTopology#connectVerticesToConsumedPartitions}} 
could be removed.
 # All the related usages should be fixed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21330) Optimization the initialization of PipelinedRegionSchedulingStrategy

2021-02-08 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21330:


 Summary: Optimization the initialization of 
PipelinedRegionSchedulingStrategy
 Key: FLINK-21330
 URL: https://issues.apache.org/jira/browse/FLINK-21330
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0


{{PipelinedRegionSchedulingStrategy}} is used for task scheduling. Its 
initialization is located at {{PipelinedRegionSchedulingStrategy#init}}. The 
initialization can be divided into two parts:
 # Calculating consumed result partitions of SchedulingPipelinedRegions
 # Calculating the consumer pipelined region of SchedulingResultPartition

Based on FLINK-21328, the {{consumedResults}} of 
{{DefaultSchedulingPipelinedRegion}} can be replaced with 
{{ConsumedPartitionGroup}}.

Then we can optimize the procedures we mentioned above. After the optimization, 
the time complexity decreases from O(N^2) to O(N).

The related usage of {{getConsumedResults}} should be replaced, too.

The detailed design doc is located at: 
[https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit#heading=h.a1mz4yjpry6m]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21331) Optimize calculating tasks to restart in RestartPipelinedRegionFailoverStrategy

2021-02-08 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21331:


 Summary: Optimize calculating tasks to restart in 
RestartPipelinedRegionFailoverStrategy
 Key: FLINK-21331
 URL: https://issues.apache.org/jira/browse/FLINK-21331
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0


RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to 
restart when a task failure occurs. It contains two parts: firstly calculate 
the regions to restart; then add all the tasks in these regions to the 
restarting queue.

The bottleneck is mainly in the first part. This part traverses all the 
upstream and downstream regions of the failed region to determine whether they 
should be restarted or not.

For the current failed region, if its consumed result partition is not 
available, the owner, i.e., the upstream region should restart. Also, since the 
failed region needs to restart, its result partition won't be available, all 
the downstream regions need to restart, too.

1. Calculating the upstream regions that should restart

The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
  for each consumed SchedulingResultPartition of the SchedulingExecutionVertex:
if the result partition is not available:
  add the producer region to the restart queue
{code}
Based on FLINK-21328, the consumed result partition of a vertex is already 
grouped. Here we can use a HashSet to record the visited result partition 
group. For vertices connected with all-to-all edges, they will only need to 
traverse the group once. This decreases the time complexity from O(N^2) to O(N).

2. Calculating the downstream regions that should restart

The current implementation is:
{code:java}
for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion:
  for each produced SchedulingResultPartition of the SchedulingExecutionVertex:
for each consumer SchedulingExecutionVertex of the produced 
SchedulingResultPartition:
  if the region containing the consumer SchedulingExecutionVertex is not 
visited:
add the region to the restart queue
{code}
Since the count of the produced result partitions of a vertex equals the count 
of output JobEdges, the time complexity of this procedure is actually O(N^2). 
As the consumer vertices of a result partition are already grouped, we can use 
a HashSet to record the visited ConsumerVertexGroup. The time complexity 
decreases from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21332) Optimize releasing result partitions in RegionPartitionReleaseStrategy

2021-02-08 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21332:


 Summary: Optimize releasing result partitions in 
RegionPartitionReleaseStrategy
 Key: FLINK-21332
 URL: https://issues.apache.org/jira/browse/FLINK-21332
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.13.0


RegionPartitionReleaseStrategy is responsible for releasing result partitions 
when all the downstream tasks finish.

The current implementation is:
{code:java}
for each consumed SchedulingResultPartition of current finished 
SchedulingPipelinedRegion:
  for each consumer SchedulingPipelinedRegion of the SchedulingResultPartition:
if all the regions are finished:
  release the partitions
{code}
The time complexity of releasing a result partition is O(N^2). However, 
considering that during the entire stage, all the result partitions need to be 
released, the time complexity is actually O(N^3).

After the optimization of DefaultSchedulingTopology, the consumed result 
partitions are grouped. Since the result partitions in one group are 
isomorphic, we can just cache the finished status of result partition groups 
and the corresponding pipeline regions.

The optimized implementation is:
{code:java}
for each ConsumedPartitionGroup of current finished SchedulingPipelinedRegion:
  if all consumer SchedulingPipelinedRegion of the ConsumedPartitionGroup are 
finished:
set the ConsumePartitionGroup to be fully consumed
for result partition in the ConsumePartitionGroup:
  if all the ConsumePartitionGroups it belongs to are fully consumed:
release the result partition
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21524) Replace scheduler benchmarks with wrapper classes

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21524:


 Summary: Replace scheduler benchmarks with wrapper classes
 Key: FLINK-21524
 URL: https://issues.apache.org/jira/browse/FLINK-21524
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Due to FLINK-21514, we find that when someone modifies interfaces and methods 
used by scheduler benchmarks, the compilation and execution of flink-benchmark 
may break. 

 

To improve the stability of scheduler benchmarks, we decide to replace 
scheduler benchmarks with wrapper/executor classes, and move the current 
implementations to the flink repository. Also we'll implement several unit 
tests based on the implementations. In this way, when someone modifies 
interfaces and methods used by scheduler benchmarks, the unit tests fails, and 
then they must fix the broken benchmarks. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21525) Move scheduler benchmarks to Flink and add unit tests

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21525:


 Summary: Move scheduler benchmarks to Flink and add unit tests
 Key: FLINK-21525
 URL: https://issues.apache.org/jira/browse/FLINK-21525
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


To improve the stability of scheduler benchmarks, we'll move the implementation 
of scheduler benchmarks to the Flink repository and add several unit tests. 
When someone modifies the interfaces/methods used by scheduler benchmarks, the 
unit tests will fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21526) Replace scheduler benchmarks with wrapper classes

2021-02-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21526:


 Summary: Replace scheduler benchmarks with wrapper classes
 Key: FLINK-21526
 URL: https://issues.apache.org/jira/browse/FLINK-21526
 Project: Flink
  Issue Type: Sub-task
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


After moving the implementations of scheduler benchmarks to the Flink 
repository, we can replace scheduler benchmarks in flink-benchmark with wrapper 
classes.

This will make sure the compilation and execution of flink-benchmark will not 
fail because of modifications in interfaces/methods used by scheduler 
benchmarks.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21731) Add a benchmark for DefaultScheduler#startScheduling

2021-03-10 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21731:


 Summary: Add a benchmark for DefaultScheduler#startScheduling
 Key: FLINK-21731
 URL: https://issues.apache.org/jira/browse/FLINK-21731
 Project: Flink
  Issue Type: Improvement
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


We notice that {{DefaultScheduler#allocateSlotsAndDeploy}} is not covered in 
the current scheduler benchmark. When we are trying to implement a benchmark 
related to this procedure, we think that it's better to implement a benchmark 
that covers the entire {{DefaultScheduler#startScheduling}} procedure instead.

This can avoid missing any parts that may affect the performance of scheduling. 
Also in this way we don't need to add benchmarks for every sub-procedure 
related to scheduling, which makes the benchmark heavy and hard to maintain. 
Then we can just focus on the performance-sensitive procedures, as the existing 
benchmarks do.

The new benchmark item is implemented based on 
{{DefaultSchedulerBatchSchedulingTest}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21915) Optimize Execution#finishPartitionsAndUpdateConsumers

2021-03-22 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21915:


 Summary: Optimize Execution#finishPartitionsAndUpdateConsumers
 Key: FLINK-21915
 URL: https://issues.apache.org/jira/browse/FLINK-21915
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Based on the scheduler benchmark {{PartitionReleaseInBatchJobBenchmark}} 
introduced in FLINK-20612, we find that there's another procedure that has 
O(N^2) computation complexity: 
{{Execution#finishPartitionsAndUpdateConsumers}}. 

Once an execution is finished, it will finish all its BLOCKING partitions and 
update the partition info to all consumer vertices. The procedure can be 
illustrated as the following pseudo code:
{code:java}
for all Execution in ExecutionGraph:
  for all produced IntermediateResultPartition of the Execution:
for all consumer ExecutionVertex of the IntermediateResultPartition:
  update or cache partition info{code}
This procedure has O(N^2) complexity in total.

Based on FLINK-21326, the consumed partitions are grouped if they are connected 
to the same consumer vertices. Therefore, we can update partition info of the 
entire ConsumedPartitionGroup in batch, rather than one by one. This will 
decrease the complexity from O(N^2) to O(N).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21920) Optimize DefaultScheduler#allocateSlots

2021-03-22 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21920:


 Summary: Optimize DefaultScheduler#allocateSlots
 Key: FLINK-21920
 URL: https://issues.apache.org/jira/browse/FLINK-21920
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


Based on the scheduler benchmark introduced in FLINK-21731, we find that there 
are several procedures related to {{DefaultScheduler#allocateSlots}} have 
O(N^2) complexity. 

 

The first one is: 
{{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}.
 The original implementation is:

 
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
get the result partition's producer vertex and determine the 
ExecutionSlotSharingGroup where the producer vertex locates is available for 
current vertex{code}
This procedure has O(N^2) complexity.

It's obvious that the result partitions in the same ConsumedPartitionGroup have 
the same producer vertex. So we can just iterate over the 
ConsumedPartitionGroups instead of all the consumed partitions. This will 
decrease the complexity from O(N^2) to O(N).

 

The second one is: 
{{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}.
 The original implementation is:
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all ConsumedPartitionGroup of the SchedulingExecutionVertex:
for all IntermediateResultPartition in the ConsumedPartitionGroup:
  get producer of the IntermediateResultPartition {code}
This procedure has O(N^2) complexity.

We can see that for each SchedulingExecutionVertex, the producers of its 
ConsumedPartitionGroup is calculated separately. For 
SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same 
ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and 
over again. We can use a local cache to cache the producers. This will decrease 
the complexity from O(N^2) to O(N).

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-21975) Remove hamcrest dependency from SchedulerBenchmarkBase

2021-03-25 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-21975:


 Summary: Remove hamcrest dependency from SchedulerBenchmarkBase
 Key: FLINK-21975
 URL: https://issues.apache.org/jira/browse/FLINK-21975
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


When we are trying to add BenchmarkExecutor for benchmarks introduced in FLINK 
21731, we found that the dependency of hamcrest is introduced by mistake. Since 
there's no hamcrest dependency in flink-benchmark, this will break the 
benchmark. Thus we need to remove this dependency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges

2021-03-29 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22017:


 Summary: Regions may never be scheduled when there are 
cross-region blocking edges
 Key: FLINK-22017
 URL: https://issues.apache.org/jira/browse/FLINK-22017
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Attachments: Illustration.jpg

For the topology with cross-region blocking edges, there are regions that may 
never be scheduled. The case is illustrated in the figure below.

!Illustration.jpg!

Let's denote the vertices with layer_number. It's clear that the edge connects 
v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no blocking 
edges connected to other regions, it will be scheduled first. When vertex2_2 is 
finished, PipelinedRegionSchedulingStrategy will trigger 
{{onExecutionStateChange}} for it.

As expected, region 2 will be scheduled since all its consumer partitions are 
consumable. But in fact region 2 won't be scheduled, because the result 
partition of vertex2_2 is not tagged as consumable. Whether it is consumable or 
not is determined by its IntermediateDataSet.

However, an IntermediateDataSet is consumable if and only if all the producers 
of its IntermediateResultPartitions are finished. This IntermediateDataSet will 
never be consumable since vertex2_3 is not scheduled. All in all, this forms a 
deadlock that a region will never be scheduled because it's not scheduled.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22037) Remove the redundant blocking queue from DeployingTasksBenchmarkBase

2021-03-30 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22037:


 Summary: Remove the redundant blocking queue from 
DeployingTasksBenchmarkBase
 Key: FLINK-22037
 URL: https://issues.apache.org/jira/browse/FLINK-22037
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


We find that the {{BlockingQueue}} that used to preserve 
{{TaskDeploymentDescriptors}} are never used later. Since 
{{TaskDeploymentDescriptors}} would cost massive heap memory, it may introduce 
unnecessary garbage collection and make the result of benchmarks related to 
deployment unstable. So we think it's better to remove the redundant blocking 
queue in {{DeployingTasksBenchmarkBase}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22077) Wrong way to calculate cross-region ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy

2021-03-31 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22077:


 Summary: Wrong way to calculate cross-region 
ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy
 Key: FLINK-22077
 URL: https://issues.apache.org/jira/browse/FLINK-22077
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


h3. Introduction

We implement a wrong way to calculate cross-region ConsumedPartitionGroups in 
{{PipelinedRegionSchedulingStrategy}}, it slows down the procedure of 
{{onExecutionStateChange}}, make the complexity from O(N) to O(N^2). Also the 
semantic of cross-region is totally wrong.
h3. Details

In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need 
to schedule all region with no external blocking edges, i.e., source regions. 
To decrease the complexity, we choose to schedule all the regions that has no 
external BLOCKING ConsumedPartitionGroups.

However, for the case illustrated in FLINK-22017, the region 2 has a 
ConsumedPartitionGroup, which has both internal and external blocking 
IntermediateResultPartitions. If we choose one to represent the entire 
ConsumedPartitionGroup, it may choose the internal one, and make the entire 
group internal. Region 2 will be scheduled.

As Region 1 is not finished, Region 2 cannot transition to running. A deadlock 
may happen if resource is not enough for both two regions.

To make it right, we introduced cross-region ConsumedPartitionGroups in 
FLINK-21330. The regions which has ConsumedPartitionGroups with both internal 
and external blocking IntermediateResultPartitions will be recorded. When we 
call {{startScheduling}}, these ConsumedPartitionGroups will be treated as 
external, and region 2 will not be scheduled.

But we have to admit that the implementation of cross-region is wrong. The 
ConsumedPartitionGroups that has multiple producer regions will be treated as 
cross-region groups. It is not the same logic as we mentioned above. The 
semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will be treated as cross-region groups, since their 
producers are in different regions. (Each producer has its own region.) This 
slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges.
h3. Solution

To correctly calculate the cross-region ConsumedPartitionGroups, we can just 
calculate the producer regions for all ConsumedPartitionGroups, and then 
iterate all the regions and its ConsumedPartitionGroups. If the 
ConsumedPartitionGroup has two or more producer regions, and the regions 
contains current region, it is a cross-region ConsumedPartitionGroup. This 
meets the correct semantics, and make sure ALL-TO-ALL BLOCKING 
ConsumedPartitionGroups will not be treated as cross-region one. This fix will 
also decreases the complexity from O(N) to O(N^2). I prefer it's necessary to 
add this bug-fix to release 1.13.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22241) Record AllocationID instead of JobID when marking slot active failed

2021-04-12 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22241:


 Summary: Record AllocationID instead of JobID when marking slot 
active failed
 Key: FLINK-22241
 URL: https://issues.apache.org/jira/browse/FLINK-22241
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Task
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


For {{TaskExecutor#handleAcceptedSlotOffers}}, at present when slot failed to 
be marked as failed, the log prints JobID instead of AllocationID:
{code:java}
final String message = "Could not mark slot " + jobId + " active.";
{code}
I think it's better to print the AllocationID here like this:
{code:java}
final String message = "Could not mark slot " + acceptedSlot.getAllocationId() 
+ " active.";
{code}
or
{code:java}
final String message = String.format("Could not mark slot %s active for job 
%s.", acceptedSlot.getAllocationId(), jobId);
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient

2021-04-14 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22284:


 Summary: Null address will be logged when channel is closed in 
NettyPartitionRequestClient
 Key: FLINK-22284
 URL: https://issues.apache.org/jira/browse/FLINK-22284
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.13.0


In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the 
channel will throw a LocalTransportException with the error message "Sending 
the partition request to 'null' failed.". The message is confusing since we 
wouldn't know where the remote client connected to this channel locates, and we 
couldn't track down to that TaskExecutor and find out what happened.

 

Also I'm wondering that should we use TransportException instead of 
LocalTransportException here, because it's a little confusing to see a 
LocalTransportException is thrown out when a remote channel is closed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs

2021-05-12 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22643:


 Summary: Too many TCP connections among TaskManagers for large 
scale jobs
 Key: FLINK-22643
 URL: https://issues.apache.org/jira/browse/FLINK-22643
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Affects Versions: 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


For the large scale jobs, there will be too many TCP connections among 
TaskManagers. Let's take an example.

For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. We 
divide the vertices into 5 slot sharing groups. Each TaskManager has 5 slots. 
Thus there will be 400 taskmanagers in this job. Let's assume that job runs on 
a cluster with 20 machines.

If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 
303,240 TCP connections for each machine. If we run several jobs on this 
cluster, the TCP connections may exceed the maximum limit of linux, which is 
1,048,576. This will stop the TaskManagers from creating new TCP connections 
and cause task failovers.

As we run our production jobs on a K8S cluster, the job always failover due to 
exceptions related to network, such as {{Sending the partition request to 
'null' failed}}, and etc.

We think that we can decrease the number of connections by letting tasks reuse 
the same connection. We implemented a POC that makes all tasks on the same 
TaskManager reuse one TCP connection. For the example job we mentioned above, 
the number of connections will decrease from 303,240 to 15960. With the POC, 
the frequency of meeting exceptions related to network in our production jobs 
drops significantly.

The POC is illustrated in: 
https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22767) Optimize the initialization of ExecutionSlotSharingGroupBuilder

2021-05-25 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22767:


 Summary: Optimize the initialization of 
ExecutionSlotSharingGroupBuilder
 Key: FLINK-22767
 URL: https://issues.apache.org/jira/browse/FLINK-22767
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong


Based on the scheduler benchmark introduced in FLINK-21731, we find that during 
the initialization of ExecutionSlotSharingGroupBuilder, there's a procedure 
that has O(N^2) complexity: 
{{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}.
 This initialization happens during the initialization of 
LocalInputPreferredSlotSharingStrategy. 

The original implementation is: 
{code:java}
for all SchedulingExecutionVertex in DefaultScheduler:
  for all consumed SchedulingResultPartition of the SchedulingExecutionVertex:
get the result partition's producer vertex and determine the 
ExecutionSlotSharingGroup where the producer vertex locates is available for 
current vertex{code}
This procedure has O(N^2) complexity.

It's obvious that the result partitions in the same ConsumedPartitionGroup have 
the same producer vertex. So we can just iterate over the 
ConsumedPartitionGroups instead of all the consumed partitions. This will 
decrease the complexity from O(N^2) to O(N).

The optimization of this procedure will speed up the initialization of 
DefaultScheduler. It will accelerate the submission of a new job, especially 
for OLAP jobs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22773) Optimize the construction of pipelined regions

2021-05-25 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22773:


 Summary: Optimize the construction of pipelined regions
 Key: FLINK-22773
 URL: https://issues.apache.org/jira/browse/FLINK-22773
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Affects Versions: 1.13.0
Reporter: Zhilong Hong


During the initialization of DefaultExecutionTopology, pipelined regions will 
be computed for scheduling. Currently the complexity of this procedure is 
O(N^2):
 
{code:java}
for all vertices in the topology:
  for all consumed results of the vertex:
if the consumed result is reconnectable:
  merge the current region with its producer region
{code}

One possible solution is mentioned in FLINK-17330.

If we can optimize this procedure from O(N^2) to O(N), it will speed up the 
initialization of SchedulerNG, and accelerate the submission of a new job, 
especially for OLAP jobs.




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-22863) ArrayIndexOutOfBoundsException may happen when building rescale edges

2021-06-03 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-22863:


 Summary: ArrayIndexOutOfBoundsException may happen when building 
rescale edges
 Key: FLINK-22863
 URL: https://issues.apache.org/jira/browse/FLINK-22863
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.13.1, 1.13.0
Reporter: Zhilong Hong
 Fix For: 1.14.0, 1.13.2
 Attachments: image-2021-06-03-15-06-09-301.png

For EdgeManagerBuildUtil introduced in FLINK-21326, we find that during the 
construction of rescale edges, it may throw ArrayIndexOutOfBoundsException like 
this:

!image-2021-06-03-15-06-09-301.png|width=938,height=200!

It is mainly caused by the precision of {{double}} in Java.

In EdgeManagerBuildUtil#connectPointwise, when upstream parallelism < 
downstream parallelism, we calculate the indices of downstream vertices that 
connect to each upstream partition like this:
{code:java}
int start = (int) (Math.ceil(partitionNum * factor)); 
int end = (int) (Math.ceil((partitionNum + 1) * factor));
{code}
The index range is [{{start}}, {{end}}). 

In some cases the value of {{end}} may exceed the downstream parallelism and 
throw the ArrayIndexOutOfBoundsException.

Let's take an example. The upstream parallelism is 7. The downstream 
parallelism is 29. For the last upstream partition (which {{partitionNum}} is 
6), {{(partitionNum + 1) * factor}} is 29.2, which is slightly larger than 
29. This is caused by the precision of {{double}}. Then {{end}} = 
{{Math.ceil(29.2)}}, which is 30. ArrayIndexOutOfBoundsException is thrown 
here.

To solve this issue, we need to add an extra check for the boundary condition 
like this:
{code:java}
int end = Math.min(targetCount, (int) (Math.ceil((partitionNum + 1) * factor)));
{code}
This affects release-1.13.0 and release-1.13.1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23005) Optimize the deployment of tasks

2021-06-15 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23005:


 Summary: Optimize the deployment of tasks
 Key: FLINK-23005
 URL: https://issues.apache.org/jira/browse/FLINK-23005
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.14.0


h3. Introduction

The optimizations introduced in FLINK-21110 so far have improved the 
performance of job initialization, failover and partitions releasing. However, 
the task deployment is still slow. For a job with two vertices, each vertex has 
8k parallelism and they are connected with the all-to-all edge. It takes 
32.611s to deploy all the tasks and make them transition to running. If the 
parallelisms are 16k, it may take more than 2 minutes.

As the creation of TaskDeploymentDescriptors runs in the main thread of 
jobmanager, it means that the jobmanager cannot deal with other akka messages 
like heartbeats, task status update, and etc., for more than two minutes.

 

All in all, currently there are two issues in the deployment of tasks for large 
scale jobs:
 # It takes a long time to deploy tasks, especially for all-to-all edges.
 # Heartbeat timeout may happen during or after the procedure of task 
deployments. For the streaming job, it would cause the failover of the entire 
region. The job may never transition to running since there would be another 
heartbeat timeout during the procedure of new task deployments.

h3. Proposal

Task deployments involves the following procedures:
 # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread
 # TaskDeploymentDescriptor is serialized in the future executor
 # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
 # TaskExecutors create a new task thread and execute it

The optimization contains two parts:

*1. Cache the compressed serialized value of ShuffleDescriptors*

ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
IntermediateResultPartitions that a task consumes. For the downstream vertices 
connected with the all-to-all edge that has _N_ parallelism, we need to 
calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, 
they share the same ShuffleDescriptors since they all consume the same 
IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for 
each downstream vertex individually. We can just cache them. This will decrease 
the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to 
O(N).

Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
times, so we can just cache the serialized value of ShuffleDescriptors instead 
of the original object. To decrease the size of akka messages and reduce 
replicated data over the network, these serialized value can be compressed.

*2. Distribute the ShuffleDescriptors via blob server*

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that jobmanager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy on the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.
h3. Summary

In summary, the optimization of task deployment is to introduce a cache for the 
TaskDeploymentDescriptor. We cache the compressed serialized value of 
ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
value would be distributed via the blob server.
h3. Comparison

We implemented a POC and conducted an experiment to compare the performance of 
our optimization. We choose the streaming job in the experiment because no task 
will be running until all tasks are deployed. This avoids other disturbing 
factors. The job contains two vertices: a source and a sink. They are connected 
with an all-to-all edge.

The results illustrated below are the time interval between the timestamp of 
the first task that transitions to _deploying_ and the timestamp of the last 
task that transitions to _running_:
||Parallelism||Before||After ||
|8000*8000|32.611s|6.480

[jira] [Created] (FLINK-23153) Benchmark not compiling

2021-06-24 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23153:


 Summary: Benchmark not compiling
 Key: FLINK-23153
 URL: https://issues.apache.org/jira/browse/FLINK-23153
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.14.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


In FLINK-23085, FutureUtils is moved from flink-runtime to flink-core. The 
reference in flink-benchmark should also be changed. One known reference is 
located at: org/apache/flink/benchmark/operators/RecordSource.java.

The travis CI is broken at this moment: 
https://travis-ci.com/github/apache/flink-benchmarks/builds/230813827#L2026



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23172) Links of restart strategy in configuration page is broken

2021-06-28 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23172:


 Summary: Links of restart strategy in configuration page is broken
 Key: FLINK-23172
 URL: https://issues.apache.org/jira/browse/FLINK-23172
 Project: Flink
  Issue Type: Technical Debt
  Components: Documentation
Affects Versions: 1.14.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


The links in Fault Tolerance section of [the configuration 
page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/]
 is broken. Currently the link refers to 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy,
 which doesn't exist and would head to 404 error. The correct link is 
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23218) Distribute the ShuffleDescriptors via blob server

2021-07-02 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23218:


 Summary: Distribute the ShuffleDescriptors via blob server
 Key: FLINK-23218
 URL: https://issues.apache.org/jira/browse/FLINK-23218
 Project: Flink
  Issue Type: Sub-task
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.14.0


h3. Introduction

The optimizations introduced in FLINK-21110 so far have improved the 
performance of job initialization, failover and partitions releasing. However, 
the task deployment is still slow. For a job with two vertices, each vertex has 
8k parallelism and they are connected with the all-to-all edge. It takes 
32.611s to deploy all the tasks and make them transition to running. If the 
parallelisms are 16k, it may take more than 2 minutes.

As the creation of TaskDeploymentDescriptors runs in the main thread of 
jobmanager, it means that the jobmanager cannot deal with other akka messages 
like heartbeats, task status update, and etc., for more than two minutes.

 

All in all, currently there are two issues in the deployment of tasks for large 
scale jobs:
 # It takes a long time to deploy tasks, especially for all-to-all edges.
 # Heartbeat timeout may happen during or after the procedure of task 
deployments. For the streaming job, it would cause the failover of the entire 
region. The job may never transition to running since there would be another 
heartbeat timeout during the procedure of new task deployments.

h3. Proposal

Task deployment involves the following procedures:
 # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread
 # TaskDeploymentDescriptor is serialized in the future executor
 # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call
 # TaskExecutors create a new task thread and execute it

The optimization contains two parts:

*1. Cache the compressed serialized value of ShuffleDescriptors*

ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the 
IntermediateResultPartitions that a task consumes. For the downstream vertices 
connected with the all-to-all edge that has _N_ parallelism, we need to 
calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, 
they share the same ShuffleDescriptors since they all consume the same 
IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for 
each downstream vertex individually. We can just cache them. This will decrease 
the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to 
O(N).

Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ 
times, so we can just cache the serialized value of ShuffleDescriptors instead 
of the original object. To decrease the size of akka messages and reduce the 
transmission of replicated data over the network, these serialized value can be 
compressed.

*2. Distribute the ShuffleDescriptors via blob server*

For ShuffleDescriptors of vertices with 8k parallelism, the size of their 
serialized value is more than 700 Kilobytes. After the compression, it would be 
200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more 
than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the 
TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would 
become a heavy burden for the garbage collector to deal with.

In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed 
via the blob server if their sizes exceed a certain threshold (which is defined 
as {{blob.offload.minsize}}). TaskExecutors request the information from the 
blob server once they begin to process the TaskDeploymentDescriptor. This make 
sure that jobmanager don't need to keep all the copies in the heap memory until 
the TaskDeploymentDescriptors are all sent. There will be only one copy on the 
blob server. Like the JobInformation, we can just distribute the cached 
ShuffleDescriptors via the blob server if their overall size has exceeded the 
threshold.
h3. Summary

In summary, the optimization of task deployment is to introduce a cache for the 
TaskDeploymentDescriptor. We cache the compressed serialized value of 
ShuffleDescriptors. If the size of the value exceeds a certain threshold, the 
value would be distributed via the blob server.
h3. Comparison

We implemented a POC and conducted an experiment to compare the performance of 
our optimization. We choose the streaming job in the experiment because no task 
will be running until all tasks are deployed. This avoids other disturbing 
factors. The job contains two vertices: a source and a sink. They are connected 
with an all-to-all edge.

The results illustrated below are the time interval between the timestamp of 
the first task that transitions to _deploying_ and the timestamp of the last 
task that transitions to _running_:
||Parallelism

[jira] [Created] (FLINK-23354) Limit the size of blob cache on TaskExecutor

2021-07-12 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23354:


 Summary: Limit the size of blob cache on TaskExecutor
 Key: FLINK-23354
 URL: https://issues.apache.org/jira/browse/FLINK-23354
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.14.0


Currently a TaskExecutor uses BlobCache to cache the blobs transported from 
JobManager. The caches are the local file stored on the TaskExecutor. The blob 
cache will not be cleaned up until one hour after the related job is finished. 
At present, JobInformation and TaskInformation are transported via blob. If a 
lot of jobs are submitted, the blob cache will occupy large amount of disk 
space. In FLINK-23218, we are going to distribute the cached ShuffleDescriptors 
via blob. When large amount of failovers happen, there will be a lot of cache 
stored on local disk. In extreme cases, the blob would blow up the disk space.

So we need to add a limit size for the blob cache on TaskExecutor, as described 
in the comments of FLINK-23218. The main idea is to add a size limit and and 
delete blobs in LRU order if the size limit is exceeded. Before a blob item is 
cached, TaskExecutor will firstly check the overall size of cache. If the 
overall size exceeds the limit, the blob will be deleted in LRU order until the 
limit is not exceeded anymore. For the blob cache that is deleted, if it is 
used afterwards, it will be downloaded from the blob server again.

The default value of the size limit of the blob cache on TaskExecutor will be 
10GiB.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23491) Scheduler benchmark not running

2021-07-25 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23491:


 Summary: Scheduler benchmark not running
 Key: FLINK-23491
 URL: https://issues.apache.org/jira/browse/FLINK-23491
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks, Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


As shown in 
[codespeed|http://codespeed.dak8s.net:8000/timeline/#/?exe=5&env=2&revs=1000&equid=off&quarts=on&extr=on&ben=grid],
 we find that the scheduler benchmark are not running from July 22nd to July 
25th. The last valid commit revision is fce9c1d8. I tried to run the scheduler 
benchmarks locally and find that they work well. 

I've checked the log and find that the scheduling benchmark is not running 
because:
{code:bash}
java.lang.NoClassDefFoundError: 
org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl
at 
org.apache.flink.scheduler.benchmark.e2e.CreateSchedulerBenchmarkExecutor.setup(CreateSchedulerBenchmarkExecutor.java:51)
at 
org.apache.flink.scheduler.benchmark.e2e.generated.CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest._jmh_tryInit_f_createschedulerbenchmarkexecutor0_0(CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.java:342)
at 
org.apache.flink.scheduler.benchmark.e2e.generated.CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.createScheduler_SingleShotTime(CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.java:294)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453)
at 
org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 15 more{code}
It seems that the benchmark is still using {{SlotPoolImpl}} removed in 
FLINK-22477. But it's weird that {{SlotPoolImpl}} is removed on July 13th, 
while the benchmark is not running on July 22nd. I think maybe it's related to 
JMH?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23599) Remove JobVertex#connectIdInput

2021-08-03 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23599:


 Summary: Remove JobVertex#connectIdInput
 Key: FLINK-23599
 URL: https://issues.apache.org/jira/browse/FLINK-23599
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


{{JobVertex#connectIdInput}} is not used in production anymore. It's only used 
in the unit tests {{testAttachViaIds}} and {{testCannotConnectMissingId}} 
located in {{DefaultExecutionGraphConstructionTest}}. However, these two test 
cases are designed to test this method. Therefore, this method and its test 
cases can be removed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23833) Cache of ShuffleDescriptors should be individually cleaned up

2021-08-17 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23833:


 Summary: Cache of ShuffleDescriptors should be individually 
cleaned up
 Key: FLINK-23833
 URL: https://issues.apache.org/jira/browse/FLINK-23833
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhilong Hong
 Fix For: 1.14.0


{color:#172b4d}In FLINK-23005, we introduce the cache of compressed serialized 
value for ShuffleDescriptors to improve the performance of deployment. To make 
sure the cache wouldn't stay too long and become a burden for GC, the cache 
would be cleaned up when the partition is released or reset for new execution. 
In the implementation, the cache of the entire IntermediateResult is cleaned up 
because a partition is released only when the entire IntermediateResult is 
released. {color}

{color:#172b4d}However, after FLINK-22017, the BLOCKING result partition is 
allowed to be consumable individually. It also means that the result partition 
doesn't need to wait for other result partitions and can be released 
individually. After this change, there may be a scene: when a result partition 
is finished, the cache of IntermediateResult on the blob is deleted, while 
other result partitions corresponding to this IntermediateResult is just 
deployed to the TaskExecutor. Then when TaskExecutors are trying to download 
TDD from the blob, they will find the blob is deleted and get stuck.{color}

{color:#172b4d}This bug only happens for jobs with POINTWISE BLOCKING edge. 
Also, the {{blob.offload.minsize}} is set to be a extremely small value, since 
the size of  ShuffleDescriptors of POINTWISE BLOCKING edges is usually small. 
To solve this issue, we just need to clean up the cache of ShuffleDescriptors 
individually.{color}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-23879) Benchmark not compiling

2021-08-19 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-23879:


 Summary: Benchmark not compiling
 Key: FLINK-23879
 URL: https://issues.apache.org/jira/browse/FLINK-23879
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Reporter: Zhilong Hong
 Fix For: 1.14.0


The benchmark is not compiling from Aug. 16th, 2021. The error is:
{noformat}
[2021-08-19T23:18:36.242Z] [ERROR] 
/home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:47:54:
 error: package org.apache.flink.streaming.runtime.streamstatus does not exist
[2021-08-19T23:18:36.242Z] [ERROR] 
/home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:350:40:
 error: cannot find symbol{noformat}
It seems to be introduced by FLINK-23767, in which {{StreamStatus}} is replaced 
with {{WatermarkStatus}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24044) Errors are output when compiling flink-runtime-web

2021-08-29 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-24044:


 Summary: Errors are output when compiling flink-runtime-web
 Key: FLINK-24044
 URL: https://issues.apache.org/jira/browse/FLINK-24044
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Web Frontend
Affects Versions: 1.14.0
Reporter: Zhilong Hong


When compiling the module {{flink-runtime-web}}, the terminal would be filled 
with errors as below:
{code:bash}
[ERROR] - Generating browser application bundles (phase: setup)... 
[ERROR] 
[ERROR] Compiling @angular/core : es2015 as esm2015 
[ERROR] 
[ERROR] Compiling @angular/common : es2015 as esm2015 
[ERROR] 
[ERROR] Compiling @angular/platform-browser : es2015 as esm2015 
[ERROR] 
[ERROR] Compiling @angular/platform-browser-dynamic : es2015 as esm2015 
[ERROR] 
[ERROR] Compiling @angular/router : es2015 as esm2015
...
{code}
 
 Although it doesn't break the compilation, maybe we should fix this.

I'm not familiar with the module flink-runtime-web or Angular. I found 
something that may be useful: 

[https://github.com/angular/angular/issues/36513]

[https://github.com/angular/angular/issues/31853]

[https://stackoverflow.com/questions/57220392/even-though-i-am-using-target-esm2015-ivy-is-compiles-as-a-esm5-module/57220445#57220445]

[https://stackoverflow.com/questions/61304548/simple-angular-9-project-compiles-whole-angular-material-es2015-as-esm2015]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24295) Too many requestPartitionState may jam the JobManager during task deployment

2021-09-15 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-24295:


 Summary: Too many requestPartitionState may jam the JobManager 
during task deployment
 Key: FLINK-24295
 URL: https://issues.apache.org/jira/browse/FLINK-24295
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Coordination
Affects Versions: 1.14.0
Reporter: Zhilong Hong


After the optimization of the phase 2 we've done in FLINK-21110, the speed of 
task deployment has accelerated. However, we find that during the task 
deployment, there may be too many {{requestPartitionState}} RPC calls from 
TaskManagers that would jam the JobManager.

Why would there be too many {{requestPartitionState}} RPC calls? After the 
optimization, the JobManager can submit tasks to TaskManagers quickly. If 
JobManager calls {{submitTask}} faster than the speed of dealing with 
{{submitTask}} by TaskManagers, there may be a scenario that some TaskManagers 
deploy tasks faster than other TaskManagers.

When a downstream task is deployed, it would try to request partitions from 
upstream tasks, which may be located at a remote TaskManager. If the upstream 
tasks are not deployed, it would request the partition state from JobManager. 
In the worst case, the complexity of the computation and memory would be O(N^2).

In our test with a streaming job, which has two vertices with the 8,000 
parallelism and connected with all-to-all edges, in the worst case, there will 
be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. Each RPC 
call requires 1 KiB space in the heap memory of the JobManager. The overall 
space cost of {{requestPartitionState}} will be 32 GiB, which is a heavy burden 
for GC to deal with.

In our test, the size of the heap memory of JobManager is 8 GiB. During the 
task deployment the JobManager gets more full GCs. The JobManager gets stuck 
since it is filled with full GCs and has no time to deal with the incoming RPC 
calls. The log is attached below.

The worst thing is that there's no log outputted for this RPC call. When a user 
find the JobManager is get slower or get stuck, he/she won't be able to find 
out why.

Why does this case rarely happen before? Before the optimization, it takes a 
long time to calculate TaskDeploymentDescriptors and send them to TaskManagers. 
JobManager calls {{submitTask}} more slowly than the speed of dealing with 
{{submitTask}} by TaskManagers in most cases. Since the deployment of tasks are 
topologically sorted, the upstream tasks is deployed before the downstream 
tasks, and this case rarely happens.

In my opinion, the solution of this issue needs more discussion. According to 
the discussion in the pull request 
([https://github.com/apache/flink/pull/6680]), it's not safe to remove this RPC 
call, because we cannot always make sure the assumption that an upstream task 
failure will always fail the downstream consumers is always right.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24300) MultipleInputOperator is running much more slowly in TPCDS

2021-09-15 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-24300:


 Summary: MultipleInputOperator is running much more slowly in TPCDS
 Key: FLINK-24300
 URL: https://issues.apache.org/jira/browse/FLINK-24300
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Network
Affects Versions: 1.14.0, 1.15.0
Reporter: Zhilong Hong
 Attachments: 64570e4c56955713ca599fd1d7ae7be891a314c6.png, 
detail-of-the-job.png, e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png, jstack.txt

When we are running TPCDS with release 1.14 we find that the job with 
MultipleInputOperator is running much more slowly than before. With a binary 
search among the commits, we find that the issue may be introduced by 
FLINK-23408. 

At the commit 64570e4c56955713ca599fd1d7ae7be891a314c6, the job runs normally 
in TPCDS, as the image below illustrates:

!64570e4c56955713ca599fd1d7ae7be891a314c6.png|width=600!

At the commit e3010c16947ed8da2ecb7d89a3aa08dacecc524a, the job q2.sql gets 
stuck for a pretty long time (longer than half an hour), as the image below 
illustrates:

!e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png|width=600!

The detail of the job is illustrated below:

!detail-of-the-job.png|width=600!

The job uses a {{MultipleInputOperator}} with one normal input and two chained 
FileSource. It has finished reading the normal input and start to read the 
chained source. Each chained source has one source data fetcher.

We capture the jstack of the stuck tasks and attach the file below. From the 
[^jstack.txt] we can see the main thread is blocked on waiting for the lock, 
and the lock is held by a source data fetcher. The source data fetcher is still 
running but the stack keeps on {{CompletableFuture.cleanStack}}.

This issue happens in a batch job. However, from where it get blocked, it seems 
also affects the streaming jobs.

For the reference, the code of TPCDS we are running is located at 
[https://github.com/ververica/flink-sql-benchmark/tree/dev].



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24316) Refactor IntermediateDataSet to have only one consumer

2021-09-17 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-24316:


 Summary: Refactor IntermediateDataSet to have only one consumer
 Key: FLINK-24316
 URL: https://issues.apache.org/jira/browse/FLINK-24316
 Project: Flink
  Issue Type: Technical Debt
  Components: Runtime / Coordination
Reporter: Zhilong Hong
 Fix For: 1.15.0


Currently, IntermediateDataSet has an assumption that an IntermediateDataSet 
can be consumed by multiple consumers. However, this assumption has never came 
to reality. For an upstream vertex that is connected to multiple downstream 
vertices, it will generate multiple IntermediateDataSets. Each consumer is 
corresponding to one IntermediateDataSet.

Furthermore, there are several checks in the code to make sure that an 
IntermediateDataSet has only one consumer, like 
{{Execution#getPartitionMaxParallelism}}, 
{{SsgNetworkMemoryCalculationUtils#getMaxSubpartitionNums}}, and etc. These 
checks make the logic complicated. And it's hard to guarantee the consistency, 
because we can't make sure all the calls to {{getConsumers}} have this check in 
the future.

Since multiple consumers for IntermediateDataSet may not come true in a long 
time, we think maybe it's better to refactor IntermediateDataSet to have only 
one consumer, as the discussion mentioned in 
[https://github.com/apache/flink/pull/16856].

If we are going to support multiple consumers for IntermediateDataSet in the 
future, we can just bring it back and refactor all the usages.

As IntermediateDataSet changes, all classes related to it should change, 
including IntermediateResult, IntermediateResultPartition, 
DefaultResultPartition, and etc. All the related sanity checks need to be 
removed, too.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-24491) ExecutionGraphInfo may not be archived when the dispatcher terminates

2021-10-09 Thread Zhilong Hong (Jira)
Zhilong Hong created FLINK-24491:


 Summary: ExecutionGraphInfo may not be archived when the 
dispatcher terminates
 Key: FLINK-24491
 URL: https://issues.apache.org/jira/browse/FLINK-24491
 Project: Flink
  Issue Type: Bug
  Components: Runtime / Configuration
Affects Versions: 1.13.2, 1.14.0, 1.15.0
Reporter: Zhilong Hong
 Fix For: 1.13.3, 1.15.0, 1.14.1


When a job finishes, its JobManagerRunnerResult will be processed in the 
callback of {{Dispatcher#runJob}}. In the callback, ExecutionGraphInfo will be 
archived by HistoryServerArchivist asynchronously. However, the 
CompletableFuture of the archiving is ignored. The job may be removed before 
the archiving is finished. For the batch job running in the per-job/application 
mode, the dispatcher will terminate itself once the job is finished. In this 
case, ExecutionGraphInfo may not be archived when the dispatcher terminates.

If the ExecutionGraphInfo is lost, users are not able to know whether the batch 
job is finished normally or not. They have to refer to the logs for the result.

The session mode is not affected, since the dispatcher won't terminate itself 
once the job is finished. The HistoryServerArchivist gets enough time to 
archive the ExcutionGraphInfo.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)