Re: [ANNOUNCE] New PMC member: Yuan Mei
Congratulations, Yuan! Best, Zhilong On Mon, Mar 14, 2022 at 7:22 PM Konstantin Knauf wrote: > Congratulations, Yuan! > > On Mon, Mar 14, 2022 at 11:29 AM Jing Zhang wrote: > > > Congratulations, Yuan! > > > > Best, > > Jing Zhang > > > > Jing Ge 于2022年3月14日周一 18:15写道: > > > > > Congrats! Very well deserved! > > > > > > Best, > > > Jing > > > > > > On Mon, Mar 14, 2022 at 10:34 AM Piotr Nowojski > > > wrote: > > > > > > > Congratulations :) > > > > > > > > pon., 14 mar 2022 o 09:59 Yun Tang napisał(a): > > > > > > > > > Congratulations, Yuan! > > > > > > > > > > Best, > > > > > Yun Tang > > > > > > > > > > From: Zakelly Lan > > > > > Sent: Monday, March 14, 2022 16:55 > > > > > To: dev@flink.apache.org > > > > > Subject: Re: [ANNOUNCE] New PMC member: Yuan Mei > > > > > > > > > > Congratulations, Yuan! > > > > > > > > > > Best, > > > > > Zakelly > > > > > > > > > > On Mon, Mar 14, 2022 at 4:49 PM Johannes Moser > > > > wrote: > > > > > > > > > > > Congrats Yuan. > > > > > > > > > > > > > On 14.03.2022, at 09:45, Arvid Heise wrote: > > > > > > > > > > > > > > Congratulations and well deserved! > > > > > > > > > > > > > > On Mon, Mar 14, 2022 at 9:30 AM Matthias Pohl < > map...@apache.org > > > > > > > > wrote: > > > > > > > > > > > > > >> Congratulations, Yuan. > > > > > > >> > > > > > > >> On Mon, Mar 14, 2022 at 9:25 AM Shuo Cheng < > njucs...@gmail.com> > > > > > wrote: > > > > > > >> > > > > > > >>> Congratulations, Yuan! > > > > > > >>> > > > > > > >>> On Mon, Mar 14, 2022 at 4:22 PM Anton Kalashnikov < > > > > > kaa@yandex.com> > > > > > > >>> wrote: > > > > > > >>> > > > > > > Congratulations, Yuan! > > > > > > > > > > > > -- > > > > > > > > > > > > Best regards, > > > > > > Anton Kalashnikov > > > > > > > > > > > > 14.03.2022 09:13, Leonard Xu пишет: > > > > > > > Congratulations Yuan! > > > > > > > > > > > > > > Best, > > > > > > > Leonard > > > > > > > > > > > > > >> 2022年3月14日 下午4:09,Yangze Guo 写道: > > > > > > >> > > > > > > >> Congratulations! > > > > > > >> > > > > > > >> Best, > > > > > > >> Yangze Guo > > > > > > >> > > > > > > >> On Mon, Mar 14, 2022 at 4:08 PM Martijn Visser < > > > > > > martijnvis...@apache.org> wrote: > > > > > > >>> Congratulations Yuan! > > > > > > >>> > > > > > > >>> On Mon, 14 Mar 2022 at 09:02, Yu Li > > > wrote: > > > > > > >>> > > > > > > Hi all! > > > > > > > > > > > > I'm very happy to announce that Yuan Mei has joined the > > > Flink > > > > > PMC! > > > > > > > > > > > > Yuan is helping the community a lot with creating and > > > > validating > > > > > > releases, > > > > > > contributing to FLIP discussions and good code > > contributions > > > > to > > > > > > >> the > > > > > > state backend and related components. > > > > > > > > > > > > Congratulations and welcome, Yuan! > > > > > > > > > > > > Best Regards, > > > > > > Yu (On behalf of the Apache Flink PMC) > > > > > > > > > > > > -- > > > > > > > > > > > > Best regards, > > > > > > Anton Kalashnikov > > > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > Konstantin Knauf > > https://twitter.com/snntrable > > https://github.com/knaufk >
Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk
Congratulations, Ingo! Best, Zhilong On Wed, Dec 8, 2021 at 10:18 PM godfrey he wrote: > Congratulations, Ingo! > > Best, > Godfrey > > Roman Khachatryan 于2021年12月6日周一 下午6:07写道: > > > > Congratulations, Ingo! > > > > Regards, > > Roman > > > > > > On Mon, Dec 6, 2021 at 11:05 AM Yang Wang wrote: > > > > > > Congratulations, Ingo! > > > > > > Best, > > > Yang > > > > > > Sergey Nuyanzin 于2021年12月6日周一 下午3:35写道: > > > > > > > Congratulations, Ingo! > > > > > > > > On Mon, Dec 6, 2021 at 7:32 AM Leonard Xu wrote: > > > > > > > > > Congratulations, Ingo! Well Deserved. > > > > > > > > > > Best, > > > > > Leonard > > > > > > > > > > > 2021年12月3日 下午11:24,Ingo Bürk 写道: > > > > > > > > > > > > Thank you everyone for the warm welcome! > > > > > > > > > > > > > > > > > > Best > > > > > > Ingo > > > > > > > > > > > > On Fri, Dec 3, 2021 at 11:47 AM Ryan Skraba > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > >> Congratulations Ingo! > > > > > >> > > > > > >> On Fri, Dec 3, 2021 at 8:17 AM Yun Tang > wrote: > > > > > >> > > > > > >>> Congratulations, Ingo! > > > > > >>> > > > > > >>> Best > > > > > >>> Yun Tang > > > > > >>> > > > > > >>> From: Yuepeng Pan > > > > > >>> Sent: Friday, December 3, 2021 14:14 > > > > > >>> To: dev@flink.apache.org > > > > > >>> Cc: Ingo Bürk > > > > > >>> Subject: Re:Re: [ANNOUNCE] New Apache Flink Committer - Ingo > Bürk > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> Congratulations, Ingo! > > > > > >>> > > > > > >>> > > > > > >>> Best, > > > > > >>> Yuepeng Pan > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> At 2021-12-03 13:47:38, "Yun Gao" > > > > > > wrote: > > > > > Congratulations Ingo! > > > > > > > > > > Best, > > > > > Yun > > > > > > > > > > > > > > > > -- > > > > > From:刘建刚 > > > > > Send Time:2021 Dec. 3 (Fri.) 11:52 > > > > > To:dev > > > > > Cc:"Ingo Bürk" > > > > > Subject:Re: [ANNOUNCE] New Apache Flink Committer - Ingo Bürk > > > > > > > > > > Congratulations! > > > > > > > > > > Best, > > > > > Liu Jiangang > > > > > > > > > > Till Rohrmann 于2021年12月2日周四 下午11:24写道: > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > On behalf of the PMC, I'm very happy to announce Ingo Bürk > as a new > > > > > >>> Flink > > > > > > committer. > > > > > > > > > > > > Ingo has started contributing to Flink since the beginning > of this > > > > > >>> year. He > > > > > > worked mostly on SQL components. He has authored many PRs and > > > > helped > > > > > >>> review > > > > > > a lot of other PRs in this area. He actively reported issues > and > > > > > >> helped > > > > > >>> our > > > > > > users on the MLs. His most notable contributions were > Support SQL > > > > > 2016 > > > > > >>> JSON > > > > > > functions in Flink SQL (FLIP-90), Register sources/sinks in > Table > > > > API > > > > > > (FLIP-129) and various other contributions in the SQL area. > > > > Moreover, > > > > > >>> he is > > > > > > one of the few people in our community who actually > understands > > > > > >> Flink's > > > > > > frontend. > > > > > > > > > > > > Please join me in congratulating Ingo for becoming a Flink > > > > committer! > > > > > > > > > > > > Cheers, > > > > > > Till > > > > > > > > > > > >>> > > > > > >> > > > > > > > > > > > > > > > > > > -- > > > > Best regards, > > > > Sergey > > > > >
Re: [ANNOUNCE] New Apache Flink Committer - Matthias Pohl
Congratulations, Matthias! Best, Zhilong On Wed, Dec 8, 2021 at 10:18 PM godfrey he wrote: > Congratulations, Matthias! > > Best, > Godfrey > > Roman Khachatryan 于2021年12月6日周一 下午6:07写道: > > > > Congratulations, Matthias! > > > > Regards, > > Roman > > > > > > On Mon, Dec 6, 2021 at 11:04 AM Yang Wang wrote: > > > > > > Congratulations, Matthias! > > > > > > Best, > > > Yang > > > > > > Sergey Nuyanzin 于2021年12月6日周一 下午3:35写道: > > > > > > > Congratulations, Matthias! > > > > > > > > On Mon, Dec 6, 2021 at 7:33 AM Leonard Xu wrote: > > > > > > > > > Congratulations Matthias! > > > > > > > > > > Best, > > > > > Leonard > > > > > > 2021年12月3日 下午11:23,Matthias Pohl 写道: > > > > > > > > > > > > Thank you! I'm looking forward to continue working with you. > > > > > > > > > > > > On Fri, Dec 3, 2021 at 7:29 AM Jingsong Li < > jingsongl...@gmail.com> > > > > > wrote: > > > > > > > > > > > >> Congratulations, Matthias! > > > > > >> > > > > > >> On Fri, Dec 3, 2021 at 2:13 PM Yuepeng Pan > wrote: > > > > > >>> > > > > > >>> Congratulations Matthias! > > > > > >>> > > > > > >>> Best,Yuepeng Pan. > > > > > >>> 在 2021-12-03 13:47:20,"Yun Gao" > 写道: > > > > > Congratulations Matthias! > > > > > > > > > > Best, > > > > > Yun > > > > > > > > > > > > > > > > -- > > > > > From:Jing Zhang > > > > > Send Time:2021 Dec. 3 (Fri.) 13:45 > > > > > To:dev > > > > > Cc:Matthias Pohl > > > > > Subject:Re: [ANNOUNCE] New Apache Flink Committer - Matthias > Pohl > > > > > > > > > > Congratulations, Matthias! > > > > > > > > > > 刘建刚 于2021年12月3日周五 11:51写道: > > > > > > > > > > > Congratulations! > > > > > > > > > > > > Best, > > > > > > Liu Jiangang > > > > > > > > > > > > Till Rohrmann 于2021年12月2日周四 > 下午11:28写道: > > > > > > > > > > > >> Hi everyone, > > > > > >> > > > > > >> On behalf of the PMC, I'm very happy to announce Matthias > Pohl as > > > > a > > > > > >> new > > > > > >> Flink committer. > > > > > >> > > > > > >> Matthias has worked on Flink since August last year. He > helped > > > > > >> review a > > > > > > ton > > > > > >> of PRs. He worked on a variety of things but most notably > the > > > > > >> tracking > > > > > > and > > > > > >> reporting of concurrent exceptions, fixing HA bugs and > deprecating > > > > > >> and > > > > > >> removing our Mesos support. He actively reports issues > helping > > > > > >> Flink to > > > > > >> improve and he is actively engaged in Flink's MLs. > > > > > >> > > > > > >> Please join me in congratulating Matthias for becoming a > Flink > > > > > >> committer! > > > > > >> > > > > > >> Cheers, > > > > > >> Till > > > > > >> > > > > > > > > > > > >> > > > > > >> > > > > > >> > > > > > >> -- > > > > > >> Best, Jingsong Lee > > > > > > > > > > > > > > > > > > -- > > > > Best regards, > > > > Sergey > > > > >
Re: [DISCUSS] FLIP-210: Change logging level dynamically at runtime
Thank you for proposing this improvement, Wenhao. Changing the logging level dynamically at runtime is very useful when users are trying to debug their jobs. They can set the logging level to DEBUG and find out more details in the logs. 1. I'm wondering if we could add a REST API to query the current logging level? This API will be useful for users to get to know the current status of the logging level, especially for those who have their own job management platform. 2. Would it be better if we add a field to specify the target JobManager/TaskManager for the logconfig API? Currently, it seems the modified logging level will be applied to all components in the cluster. If we change the logging level to DEBUG, the overall size of logs may increase rapidly, especially for large-scale clusters. It may become a heavy burden for the disk usage. Adding a field to specify the target could minimize the impact. Users can only change the logging level for the TaskManager they are focusing on. Furthermore, if users want to change the logging level for all components, the target field can be set to "ALL". On Tue, Jan 11, 2022 at 12:27 AM Konstantin Knauf wrote: > Thank you for starting the discussion. Being able to change the logging > level at runtime is very valuable in my experience. > > Instead of introducing our own API (and eventually even persistence), could > we just periodically reload the log4j or logback configuration from the > environment/filesystem? I only quickly googled the topic and [1,2] suggest > that this might be possible? > > [1] https://stackoverflow.com/a/16216956/6422562? > [2] https://logback.qos.ch/manual/configuration.html#autoScan > > > > > > On Mon, Jan 10, 2022 at 5:10 PM Wenhao Ji wrote: > > > Hi everyone, > > > > Hope you enjoyed the Holiday Season. > > > > I would like to start the discussion on the improvement purpose > > FLIP-210 [1] which aims to provide a way to change log levels at > > runtime to simplify issues and bugs detection as reported in the > > ticket FLINK-16478 [2]. > > Firstly, thanks Xingxing Di and xiaodao for their previous effort. The > > FLIP I drafted is largely influenced by their previous designs [3][4]. > > Although we have reached some agreements under the jira comments about > > the scope of this feature, we still have the following questions > > listed below ready to be discussed in this thread. > > > > ## Question 1 > > > > > Creating as custom DSL and implementing it for several logging backend > > sounds like quite a maintenance burden. Extensions to the DSL, and > > supported backends, could become quite an effort. (by Chesnay Schepler) > > > > I tried to design the API of the logging backend to stay away from the > > details of implementations but I did not find any slf4j-specific API > > that is available to change the log level of a logger. So what I did > > is to introduce another kind of abstraction on top of the slf4j / > > log4j / logback so that we will not depend on the logging provider's > > api directly. It will be convenient for us to adopt any other logging > > providers. Please see the "Logging Abstraction" section. > > > > ## Question 2 > > > > > Do we know whether other systems support this kind of feature? If yes, > > how do they solve it for different logging backends? (by Till Rohrmann) > > > > I investigated several Java frameworks including Spark, Storm, and > > Spring Boot. Here is what I found. > > Spark & Storm directly depend on the log4j implementations, which > > means they do not support any other slf4j implementation at all. They > > simply call the log4j api directly. (see SparkContext.scala#L381 [5], > > Utils.scala#L2443 [6] in Spark, and LogConfigManager.java#L144 [7] in > > Storm). They are pretty different from what Flink provides. > > However, I found Spring Boot has implemented what we are interested > > in. Just as Flink, Spring boot also supports many slf4j > > implementations. Users are not limited to log4j. They have the ability > > to declare different logging frameworks by importing certain > > dependencies. After that spring will decide the activated one by > > scanning its classpath and context. (see LoggingSystem.java#L164 [8] > > and LoggersEndpoint.java#L99 [9]) > > > > ## Question 3 > > > > Besides the questions raised in the jira comments, I also find another > > thing that has not been discussed. Considering this feature as an MVP, > > do we need to introduce a HighAvailabilityService to store the log > > settings so that they can be synced to newly-joined task managers and > > also job manager followers for consistency? This issue is included in > > the "Limitations" section in the flip. > > > > Finally, thanks for your time for joining this discussion and > > reviewing this FLIP. I would appreciate it if you could have any > > comments or suggestions on this. > > > > > > [1]: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-210%3A+Change+logging+level+dynamically+at+runtime > > [2]:
Re: I want to contribute to Apache Flink
Hi, Chengyun: Thanks for your passion for contributing to the Flink community. You don't need a contributor permission to contribute your code to Flink. Find an open issue you are interested in at http://issues.apache.org/jira/browse/FLINK and comment with your ideas. It would be a good start for issues with the label "starter". You can refer to https://flink.apache.org/contributing/how-to-contribute.html for more information about how to contribute. Cheers, Zhilong On Mon, Jan 17, 2022 at 9:59 PM chengyunzhang6 wrote: > Hi, > I want to contribute to Apache Flink. Would you please give me the > contributor permission? My JIRA full name is Chengyun Zhang.
Re: Re: Change of focus
Thank you for everything, Till! I've learned a lot from you. Good luck with your new adventure and the next chapter! Best, Zhilong On Tue, Mar 1, 2022 at 12:08 PM Yun Tang wrote: > Thanks a lot for your efforts and kindness of mentoring contributors in > Apache Flink community, Till! > > Good luck with your new adventure in your new life. > > > Best > Yun Tang > > > From: Yuan Mei > Sent: Tuesday, March 1, 2022 11:00 > To: dev > Subject: Re: Re: Change of focus > > Thanks Till for everything you've done for the community! > Good luck with your new adventure and best wishes to your new life! > > Best Regards, > Yuan > > On Tue, Mar 1, 2022 at 10:35 AM Zhu Zhu wrote: > > > Thank you for all the efforts and good luck for the new adventure, Till! > > > > Thanks, > > Zhu > > > > Terry 于2022年3月1日周二 10:26写道: > > > > > Thanks a lot for your efforts! Good Luck! > > > > > > Jiangang Liu 于2022年3月1日周二 10:18写道: > > > > > > > Thanks for the efforts and help in flink, Till. Good luck! > > > > > > > > Best > > > > Liu Jiangang > > > > > > > > Lijie Wang 于2022年3月1日周二 09:53写道: > > > > > > > > > Thanks for all your efforts Till. Good luck ! > > > > > > > > > > Best, > > > > > Lijie > > > > > > > > > > Yun Gao 于2022年3月1日周二 01:15写道: > > > > > > > > > > > Very thanks Till for all the efforts! Good luck for the next > > chapter~ > > > > > > > > > > > > Best, > > > > > > Yun > > > > > > > > > > > > > -- > > > > > > Sender:Piotr Nowojski > > > > > > Date:2022/02/28 22:10:46 > > > > > > Recipient:dev > > > > > > Theme:Re: Change of focus > > > > > > > > > > > > Good luck Till and thanks for all of your efforts. > > > > > > > > > > > > Best, > > > > > > Piotrek > > > > > > > > > > > > pon., 28 lut 2022 o 15:06 Aitozi > > napisał(a): > > > > > > > > > > > > > Good luck with the next chapter, will miss you :) > > > > > > > > > > > > > > Best, > > > > > > > Aitozi > > > > > > > > > > > > > > Jark Wu 于2022年2月28日周一 21:28写道: > > > > > > > > > > > > > > > Thank you Till for every things. It's great to work with you. > > > Good > > > > > > luck! > > > > > > > > > > > > > > > > Best, > > > > > > > > Jark > > > > > > > > > > > > > > > > On Mon, 28 Feb 2022 at 21:26, Márton Balassi < > > > > > balassi.mar...@gmail.com > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Thank you, Till. Good luck with the next chapter. :-) > > > > > > > > > > > > > > > > > > On Mon, Feb 28, 2022 at 1:49 PM Flavio Pompermaier < > > > > > > > pomperma...@okkam.it > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Good luck for your new adventure Till! > > > > > > > > > > > > > > > > > > > > On Mon, Feb 28, 2022 at 12:00 PM Till Rohrmann < > > > > > > trohrm...@apache.org > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > > > > > > > I wanted to let you know that I will be less active in > > the > > > > > > > community > > > > > > > > > > > because I’ve decided to start a new chapter in my life. > > > > Hence, > > > > > > > please > > > > > > > > > > don’t > > > > > > > > > > > wonder if I might no longer be very responsive on mails > > and > > > > > JIRA > > > > > > > > > issues. > > > > > > > > > > > > > > > > > > > > > > It is great being part of such a great community with > so > > > many > > > > > > > amazing > > > > > > > > > > > people. Over the past 7,5 years, I’ve learned a lot > > thanks > > > to > > > > > you > > > > > > > and > > > > > > > > > > > together we have shaped how people think about stream > > > > > processing > > > > > > > > > > nowadays. > > > > > > > > > > > This is something we can be very proud of. I am sure > that > > > the > > > > > > > > community > > > > > > > > > > > will continue innovating and setting the pace for what > is > > > > > > possible > > > > > > > > with > > > > > > > > > > > real time processing. I wish you all godspeed! > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > Till > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
Re: [ANNOUNCE] New Apache Flink Committer - Martijn Visser
Congratulations, Martin! Well deserved! Best, Zhilong On Sat, Mar 5, 2022 at 1:09 AM Piotr Nowojski wrote: > Congratulations! > > Piotrek > > pt., 4 mar 2022 o 16:05 Aitozi napisał(a): > > > Congratulations Martjin! > > > > Best, > > Aitozi > > > > Martijn Visser 于2022年3月4日周五 17:10写道: > > > > > Thank you all! > > > > > > On Fri, 4 Mar 2022 at 10:00, Niels Basjes wrote: > > > > > > > Congratulations Martjin! > > > > > > > > On Fri, Mar 4, 2022 at 9:43 AM Johannes Moser > > wrote: > > > > > > > > > Congratulations Martijn, > > > > > > > > > > Well deserved. > > > > > > > > > > > On 03.03.2022, at 16:49, Robert Metzger > > wrote: > > > > > > > > > > > > Hi everyone, > > > > > > > > > > > > On behalf of the PMC, I'm very happy to announce Martijn Visser > as > > a > > > > new > > > > > > Flink committer. > > > > > > > > > > > > Martijn is a very active Flink community member, driving a lot of > > > > efforts > > > > > > on the dev@flink mailing list. He also pushes projects such as > > > > replacing > > > > > > Google Analytics with Matomo, so that we can generate our web > > > analytics > > > > > > within the Apache Software Foundation. > > > > > > > > > > > > Please join me in congratulating Martijn for becoming a Flink > > > > committer! > > > > > > > > > > > > Cheers, > > > > > > Robert > > > > > > > > > > > > > > > > > > -- > > > > Best regards / Met vriendelijke groeten, > > > > > > > > Niels Basjes > > > > > > > > > >
Re: [ANNOUNCE] New Apache Flink Committer - David Morávek
Congratulations, David! Best, Zhilong On Sat, Mar 5, 2022 at 1:09 AM Piotr Nowojski wrote: > Congratulations :) > > pt., 4 mar 2022 o 16:04 Aitozi napisał(a): > > > Congratulations David! > > > > Ingo Bürk 于2022年3月4日周五 22:56写道: > > > > > Congrats, David! > > > > > > On 04.03.22 12:34, Robert Metzger wrote: > > > > Hi everyone, > > > > > > > > On behalf of the PMC, I'm very happy to announce David Morávek as a > new > > > > Flink committer. > > > > > > > > His first contributions to Flink date back to 2019. He has been > > > > increasingly active with reviews and driving major initiatives in the > > > > community. David brings valuable experience from being a committer in > > the > > > > Apache Beam project to Flink. > > > > > > > > > > > > Please join me in congratulating David for becoming a Flink > committer! > > > > > > > > Cheers, > > > > Robert > > > > > > > > > >
[DISCUSS] FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs
Hello, everyone: I would like to start the discussion about FLINK-21110: Optimize Scheduler Performance for Large-Scale Jobs [1]. According to the result of scheduler benchmarks we implemented in FLINK-20612 [2], the bottleneck of deploying and running a large-scale job in Flink is mainly focused on the following procedures: Procedure Time complexity Initializing ExecutionGraph O(N^2) Building DefaultExecutionTopology O(N^2) Initializing PipelinedRegionSchedulingStrategy O(N^2) Scheduling downstream tasks when a task finishes O(N^2) Calculating tasks to restart when a failover occurs O(N^2) Releasing result partitions O(N^2) These procedures are all related to the complexity of the topology in the ExecutionGraph. Between two vertices connected with the all-to-all edges, all the upstream Intermediate ResultPartitions are connected to all downstream ExecutionVertices. The computation complexity of building and traversing all these edges will be O(N^2). As for memory usage, currently we use ExecutionEdges to store the information of connections. For the all-to-all distribution type, there are O(N^2) ExecutionEdges. We test a simple job with only two vertices. The parallelisms of them are both 10k. Furthermore, they are connected with all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges. In most large-scale jobs, there will be more than two vertices with large parallelisms, and they would cost a lot of time and memory to deploy the job. As we can see, for two JobVertices connected with the all-to-all distribution type, all IntermediateResultPartitions produced by the upstream ExecutionVertices are isomorphic, which means that the downstream ExecutionVertices they connected are exactly the same. The downstream ExecutionVertices belonging to the same JobVertex are also isomorphic, as the upstream ResultPartitions they connect are the same, too. Since every JobEdge has exactly one distribution type, we can divide the vertices and result partitions into groups according to the distribution type of the JobEdge. For the all-to-all distribution type, since all downstream vertices are isomorphic, they belong to a single group, and all the upstream result partitions are connected to this group. Vice versa, all the upstream result partitions also belong to a single group, and all the downstream vertices are connected to this group. In the past, when we wanted to iterate all the downstream vertices, we needed to loop over them n times, which leads to the complexity of O(N^2). Now since all upstream result partitions are connected to one downstream group, we just need to loop over them once, with the complexity of O(N). For the pointwise distribution type, because each result partition is connected to different downstream vertices, they should belong to different groups. Vice versa, all the vertices belong to different groups. Since one result partition group is connected to one vertex group pointwisely, the computation complexity of looping over them is still O(N). After we group the result partitions and vertices, ExecutionEdge is no longer needed. For the test job we mentioned above, the optimization can effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 10k parallelism). The detailed design doc with illustrations is located at [3]. Please find more details in the links below. Looking forward to your feedback. [1] https://issues.apache.org/jira/browse/FLINK-21110 [2] https://issues.apache.org/jira/browse/FLINK-20612 [3] https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing
Re: [ANNOUNCE] New PMC member: Xintong Song
Congratulations, Xintong! Dawid Wysakowicz 于2021年6月16日周三 下午5:23写道: > Hi all! > > I'm very happy to announce that Xintong Song has joined the Flink PMC! > > Congratulations and welcome Xintong! > > Best, > Dawid >
Re: [ANNOUNCE] New PMC member: Guowei Ma
Congratulations, Guowei! Best, Zhilong On Tue, Jul 6, 2021 at 10:01 PM Kurt Young wrote: > Hi all! > > I'm very happy to announce that Guowei Ma has joined the Flink PMC! > > Congratulations and welcome Guowei! > > Best, > Kurt >
Re: I want to contribute to Apache Flink
Hi, Pang! Thank you for your enthusiasm for contribution. You don't need any permission to contribute codes to Apache Flink. Feel free to find the issues that you are interested in JIRA ( https://issues.apache.org/jira/projects/FLINK/issues). The issues with the label "starter" is a good choice to start with. You can comment with your idea, and request committers to assign the ticket to you. Best, Zhilong On Wed, Jul 7, 2021 at 11:56 AM pang pan wrote: > Hi Guys, > > I want to contribute to Apache Flink. > Would you please give me the permission as a contributor? > My JIRA ID is pangpan. >
Re: [ANNOUNCE] New Apache Flink Committer - Yuan Mei
Congratulations, Yuan! Best, Zhilong On Thu, Jul 8, 2021 at 1:22 AM Yu Li wrote: > Hi all, > > On behalf of the PMC, I’m very happy to announce Yuan Mei as a new Flink > committer. > > Yuan has been an active contributor for more than two years, with code > contributions on multiple components including kafka connectors, > checkpointing, state backends, etc. Besides, she has been actively involved > in community activities such as helping manage releases, discussing > questions on dev@list, supporting users and giving talks at conferences. > > Please join me in congratulating Yuan for becoming a Flink committer! > > Cheers, > Yu >
Re: [ANNOUNCE] New Apache Flink Committer - Leonard Xu
Congratulations, Leonard! On Mon, Nov 15, 2021 at 10:13 AM Qingsheng Ren wrote: > Congratulations Leonard! > > -- > Best Regards, > > Qingsheng Ren > Email: renqs...@gmail.com > On Nov 12, 2021, 12:12 PM +0800, Jark Wu , wrote: > > Hi everyone, > > > > On behalf of the PMC, I'm very happy to announce Leonard Xu as a new > Flink > > committer. > > > > Leonard has been a very active contributor for more than two year, > authored > > 150+ PRs and reviewed many PRs which is quite outstanding. > > Leonard mainly works on Flink SQL parts and drives several important > FLIPs, > > e.g. FLIP-132 (temporal table join) and FLIP-162 (correct time > behaviors). > > He is also the maintainer of flink-cdc-connectors[1] project which helps > a > > lot for users building a real-time data warehouse and data lake. > > > > Please join me in congratulating Leonard for becoming a Flink committer! > > > > Cheers, > > Jark Wu > > > > [1]: https://github.com/ververica/flink-cdc-connectors >
Re: [ANNOUNCE] New Apache Flink Committer - Yangze Guo
Congratulations, Yangze! On Mon, Nov 15, 2021 at 10:13 AM Qingsheng Ren wrote: > Congratulations Yangze! > > -- > Best Regards, > > Qingsheng Ren > Email: renqs...@gmail.com > On Nov 12, 2021, 10:11 AM +0800, Xintong Song , > wrote: > > Hi everyone, > > > > On behalf of the PMC, I'm very happy to announce Yangze Guo as a new > Flink > > committer. > > > > Yangze has been consistently contributing to this project for almost 3 > > years. His contributions are mainly in the resource management and > > deployment areas, represented by the fine-grained resource management and > > external resource framework. In addition to feature works, he's also > active > > in miscellaneous contributions, including PR reviews, document > enhancement, > > mailing list services and meetup/FF talks. > > > > Please join me in congratulating Yangze Guo for becoming a Flink > committer! > > > > Thank you~ > > > > Xintong Song >
Re: [ANNOUNCE] New Apache Flink Committer - Fabian Paul
Congratulations, Fabian! Best regards, Zhilong Hong On Tue, Nov 16, 2021 at 10:19 AM Yangze Guo wrote: > Congrats & well deserved! > > Best, > Yangze Guo > > On Tue, Nov 16, 2021 at 10:18 AM Jing Zhang wrote: > > > > Congratulations Fabian! > > > > Best, > > Jing Zhang > > > > Yuepeng Pan 于2021年11月16日周二 上午10:16写道: > > > > > Congratulations Fabian! > > > > > > Best, > > > Yuepeng Pan > > > > > > At 2021-11-15 21:17:13, "Arvid Heise" wrote: > > > >Hi everyone, > > > > > > > >On behalf of the PMC, I'm very happy to announce Fabian Paul as a new > > > Flink > > > >committer. > > > > > > > >Fabian Paul has been actively improving the connector ecosystem by > > > >migrating Kafka and ElasticSearch to the Sink interface and is > currently > > > >driving FLIP-191 [1] to tackle the sink compaction issue. While he is > > > >active on the project (authored 70 PRs and reviewed 60), it's also > worth > > > >highlighting that he has also been guiding external efforts, such as > the > > > >DeltaLake Flink connector or the Pinot sink in Bahir. > > > > > > > >Please join me in congratulating Fabian for becoming a Flink > committer! > > > > > > > >Best, > > > > > > > >Arvid > > > > > > > >[1] > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction > > > >
Re: [ANNOUNCE] New Apache Flink Committer - Jing Zhang
Congratulations, Jing! Best regards, Zhilong Hong On Mon, Nov 15, 2021 at 9:41 PM Martijn Visser wrote: > Congratulations Jing! > > On Mon, 15 Nov 2021 at 14:39, Timo Walther wrote: > > > Hi everyone, > > > > On behalf of the PMC, I'm very happy to announce Jing Zhang as a new > > Flink committer. > > > > Jing has been very active in the Flink community esp. in the Table/SQL > > area for quite some time: 81 PRs [1] in total and is also active on > > answering questions on the user mailing list. She is currently > > contributing a lot around the new windowing table-valued functions [2]. > > > > Please join me in congratulating Jing Zhang for becoming a Flink > committer! > > > > Thanks, > > Timo > > > > [1] https://github.com/apache/flink/pulls/beyond1920 > > [2] https://issues.apache.org/jira/browse/FLINK-23997 > > >
Re: [ANNOUNCE] New Apache Flink Committer - Yingjie Cao
Congratulations, Yiingjie! Best regards, Zhilong On Wed, Nov 17, 2021 at 2:13 PM Yuepeng Pan wrote: > Congratulations ! > > > Best, > Yuepeng Pan. > > > At 2021-11-17 12:55:29, "Guowei Ma" wrote: > >Hi everyone, > > > >On behalf of the PMC, I'm very happy to announce Yingjie Cao as a new > Flink > >committer. > > > >Yingjie has submitted 88 PRs since he joined the Flink community for more > >than 2 years. In general, his main contributions are concentrated in > >Flink's Shuffle. Yingjie has done a lot of work in promoting the > >performance and stability of TM Blocking Shuffle[1][2];In order to allow > >Flink to use External/Remote Shuffle Service in batch production > scenarios, > >he also improves the Flink Shuffle architecture in 1.14 [3]. At the same > >time, he has also done some related work in Streaming Shuffle, such as > >Buffer Management[4] , Non-Blocking Network Output and some important bug > >fixes. > > > >Please join me in congratulating Yingjie for becoming a Flink committer! > > > >[1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A+Introduce+Sort-Based+Blocking+Shuffle+to+Flink > >[2] https://flink.apache.org/2021/10/26/sort-shuffle-part1.html > >[3] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-184%3A+Refine+ShuffleMaster+lifecycle+management+for+pluggable+shuffle+service+framework > >[4] https://issues.apache.org/jira/browse/FLINK-16428 > > > >Best, > >Guowei >
Re: [ANNOUNCE] New Apache Flink Committers: Qingsheng Ren, Shengkai Fang
Congratulations, Qingsheng and ShengKai! Best, Zhilong. On Mon, Jun 20, 2022 at 4:00 PM Lijie Wang wrote: > Congratulations, Qingsheng and ShengKai. > > Best, > Lijie > > Paul Lam 于2022年6月20日周一 15:58写道: > > > Congrats, Qingsheng and Shengkai! > > > > Best, > > Paul Lam > > > > > 2022年6月20日 15:57,Martijn Visser 写道: > > > > > > Congratulations to both of you, this is very much deserved! > > > > > > Op ma 20 jun. 2022 om 09:53 schreef Jark Wu : > > > > > >> Hi everyone, > > >> > > >> On behalf of the PMC, I'm very happy to announce two new Flink > > committers: > > >> Qingsheng Ren and Shengkai Fang. > > >> > > >> Qingsheng is the core contributor and maintainer of the Kafka > connector. > > >> He continuously improved the existing connectors, debugged many > > connector > > >> testability issues, and worked on the connector testing framework. > > >> Recently, > > >> he is driving the work of FLIP-221 (caching lookup connector), which > is > > >> crucial for SQL connectors. > > >> > > >> Shengkai has been continuously contributing to the Flink project for > two > > >> years. > > >> He mainly works on Flink SQL parts and drives several important FLIPs, > > >> e.g. FLIP-149 (upsert-kafka), FLIP-163 (SQL CLI Improvements), > > >> FLIP-91 (SQL Gateway), and FLIP-223 (HiveServer2 Endpoint). > > >> He is very active and helps many users on the mailing list. > > >> > > >> Please join me in welcoming them as committers! > > >> > > >> Cheers, > > >> Jark Wu > > >> > > > > >
[jira] [Created] (FLINK-14065) Log metric name when the metric fails on registration/unregistration
Zhilong Hong created FLINK-14065: Summary: Log metric name when the metric fails on registration/unregistration Key: FLINK-14065 URL: https://issues.apache.org/jira/browse/FLINK-14065 Project: Flink Issue Type: Improvement Components: Runtime / Metrics Affects Versions: 1.9.0 Reporter: Zhilong Hong Fix For: 1.10.0 When MetricGroup registers Metrics in MetricRegistryImpl, sometimes the registration fails due to exceptions. However, currently it only logs {{"Error while registering metric" }}with no more information, which is inconvenient for users to troubleshoot which metric fails and why it fails. Also, the warning log in registration and unregistration are both "{{Error while registering metric}}". This will lead users to confusion (although users can locate the correct place according to the call stack). So I propose to log metric name when the metrics fails on registration/unregistration. -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (FLINK-31945) Table of contents in the blogs of the project website is missing some titles
Zhilong Hong created FLINK-31945: Summary: Table of contents in the blogs of the project website is missing some titles Key: FLINK-31945 URL: https://issues.apache.org/jira/browse/FLINK-31945 Project: Flink Issue Type: Improvement Components: Project Website Reporter: Zhilong Hong The ToC in the blog pages of the project website doesn't have all the section titles. The section titles of the first level is missing. Solution: Add the following configuration item in config.toml. {noformat} [markup.tableOfContents] startLevel = 0{noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-32051) Fix broken documenation links in Flink blogs
Zhilong Hong created FLINK-32051: Summary: Fix broken documenation links in Flink blogs Key: FLINK-32051 URL: https://issues.apache.org/jira/browse/FLINK-32051 Project: Flink Issue Type: Improvement Components: Project Website Reporter: Zhilong Hong Currently, the links to the documentations in the blogs are broken. We need to add a slash(/) at the end of the param {{DocsBaseUrl}} in config.toml like this: {noformat} [params] DocsBaseUrl = "//nightlies.apache.org/flink/" {noformat} Also, the links in this [post|https://flink.apache.org/2022/01/04/how-we-improved-scheduler-performance-for-large-scale-jobs-part-two/] is not rendered correctly. We need to add a newline after the {{{}{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-20547) Batch job fails due to the exception in network stack
Zhilong Hong created FLINK-20547: Summary: Batch job fails due to the exception in network stack Key: FLINK-20547 URL: https://issues.apache.org/jira/browse/FLINK-20547 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Zhilong Hong Attachments: inconsistent.tar.gz I run a simple batch job with only two job vertices: a source and a sink. The parallelisms of them are both 8000. They are connected via all-to-all blocking edges. During the running of sink tasks, an exception raises: {code:java} 2020-12-09 18:43:48,981 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: Sink 1 (1595/8000) (08bd4214d6e0dc144e9654f1faaa3b28) switched from RUNNING to FAILED on [masked container name] @ [masked address] (dataPort=47872). java.io.IOException: java.lang.IllegalStateException: Inconsistent availability: expected true at org.apache.flink.runtime.io.network.partition.consumer.InputChannel.checkError(InputChannel.java:232) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel.getNextBuffer(RecoveredInputChannel.java:165) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(SingleInputGate.java:626) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:603) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNext(SingleInputGate.java:591) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.taskmanager.InputGateWithMetrics.pollNext(InputGateWithMetrics.java:109) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:142) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:157) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at java.lang.Thread.run(Thread.java:834) ~[?:1.8.0_102] Caused by: java.lang.IllegalStateException: Inconsistent availability: expected true at org.apache.flink.util.Preconditions.checkState(Preconditions.java:198) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.checkConsistentAvailability(LocalBufferPool.java:434) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:564) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:509) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.tryRedistributeBuffers(NetworkBufferPool.java:438) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:166) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.requestMemorySegments(NetworkBufferPool.java:60) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at org.apache.flink.runtime.io.network.partition.consumer.BufferManager.requestExclusiveBuffers(BufferManager.java:131) ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT] at
[jira] [Created] (FLINK-20612) Add benchmarks for scheduler
Zhilong Hong created FLINK-20612: Summary: Add benchmarks for scheduler Key: FLINK-20612 URL: https://issues.apache.org/jira/browse/FLINK-20612 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong With Flink 1.12, we failed to run large-scale jobs on our cluster. When we were trying to run the jobs, we met the exceptions like out of heap memory, taskmanager heartbeat timeout, and etc. We increased the size of heap memory and extended the heartbeat timeout, the job still failed. After the troubleshooting, we found that there are some performance bottlenecks in jobmaster. These bottlenecks are highly related to the complexity of the topology. We implemented several benchmarks on these bottlenecks based on flink-benchmark. The topology of the benchmarks is a simple graph, which consists only two vertices: one source vertex and one sink vertex. They are both connected with all-to-all blocking edges. The parallelisms of the vertices are both 8k. The execution mode is batch. The results of the benchmarks are illustrated below: Table 1: The result of benchmarks on bottlenecks in the jobmaster | |*Time spent*| |Build topology|19970.44 ms| |Init scheduling strategy|41668.338 ms| |Deploy tasks|15102.850 ms| |Calculate failover region to restart|12080.271 ms| We'd like to propose the benchmarks for procedures in the runtime module. There are three main benefits: # They help us to understand the current status of task deployment performance and locate where the bottleneck is. # We can use the benchmarks to evaluate the optimization in the future. # As we run the benchmarks daily, they will help us to trace how the performance changes and locate the commit that introduces the performance regression if there is any. In the first version of the benchmarks, we mainly focus on the procedures we mentioned above. The methods that is corresponding to the procedures are: # Building topology: {{ExecutionGraph#attachJobGraph}} # Initializing scheduling strategies: {{PipelinedRegionSchedulingStrategy#init}} # Deploying tasks: {{Execution#deploy}} # Calculating failover regions: {{RestartPipelinedRegionFailoverStrategy#getTasksNeedingRestart}} The topology of benchmarks consist two vertices: source -> sink. They are connected with all-to-all edges. The result partition type ({{PIPELINED}} and {{BLOCKING}}) should be considered separately. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-27287) FileExecutionGraphInfoStoreTest unstable with "Could not start rest endpoint on any port in port range 8081"
Zhilong Hong created FLINK-27287: Summary: FileExecutionGraphInfoStoreTest unstable with "Could not start rest endpoint on any port in port range 8081" Key: FLINK-27287 URL: https://issues.apache.org/jira/browse/FLINK-27287 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.15.0 Reporter: Zhilong Hong Fix For: 1.16.0, 1.15.1 In our CI we met the exception below in {{FileExecutionGraphInfoStoreTest}} and {{MemoryExecutionGraphInfoStoreITCase}}: {code:java} org.apache.flink.util.FlinkException: Could not create the DispatcherResourceManagerComponent. at org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory.create(DefaultDispatcherResourceManagerComponentFactory.java:285) at org.apache.flink.runtime.dispatcher.ExecutionGraphInfoStoreTestUtils$PersistingMiniCluster.createDispatcherResourceManagerComponents(ExecutionGraphInfoStoreTestUtils.java:227) at org.apache.flink.runtime.minicluster.MiniCluster.setupDispatcherResourceManagerComponents(MiniCluster.java:489) at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:433) at org.apache.flink.runtime.dispatcher.FileExecutionGraphInfoStoreTest.testPutSuspendedJobOnClusterShutdown(FileExecutionGraphInfoStoreTest.java:328) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:61) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329) at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:54) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306) at org.junit.runners.ParentRunner.run(ParentRunner.java:413) at org.junit.runner.JUnitCore.run(JUnitCore.java:137) at org.junit.runner.JUnitCore.run(JUnitCore.java:115) at org.junit.vintage.engine.execution.RunnerExecutor.execute(RunnerExecutor.java:42) at org.junit.vintage.engine.VintageTestEngine.executeAllChildren(VintageTestEngine.java:80) at org.junit.vintage.engine.VintageTestEngine.execute(VintageTestEngine.java:72) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67) at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114) at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86) at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86) at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53) at org.apache.maven.surefire.junitplatform.JUnitPlatformProvider.execute(JUnitPlatformPro
[jira] [Created] (FLINK-25612) Update the outdated illustration of ExecutionState in the documentation
Zhilong Hong created FLINK-25612: Summary: Update the outdated illustration of ExecutionState in the documentation Key: FLINK-25612 URL: https://issues.apache.org/jira/browse/FLINK-25612 Project: Flink Issue Type: Technical Debt Components: Documentation Affects Versions: 1.14.2, 1.13.5, 1.15.0 Reporter: Zhilong Hong Fix For: 1.15.0, 1.13.6, 1.14.3 Attachments: current-illustration-2.jpg, new-illustration.jpg Currently, the illustration of {{ExecutionState}} located in the page "Jobs and Scheduling" ([https://nightlies.apache.org/flink/flink-docs-master/docs/internals/job_scheduling/]) is outdated. It doesn't involve the INITIALIZING state, which is introduced in FLINK-17102. Current illustration: !current-illustration-2.jpg|width=400! New illustration: !new-illustration.jpg|width=400! -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (FLINK-21110) Optimize Scheduler Performance for Large-Scale Jobs
Zhilong Hong created FLINK-21110: Summary: Optimize Scheduler Performance for Large-Scale Jobs Key: FLINK-21110 URL: https://issues.apache.org/jira/browse/FLINK-21110 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 Attachments: Illustration of Group.jpg According to the result of scheduler benchmarks we implemented in [FLINK-20612|https://issues.apache.org/jira/browse/FLINK-20612], the bottleneck of deploying and running a large-scale job in Flink is mainly focused on the following procedures: |Procedure|Time complexity| |Initializing ExecutionGraph|O(N^2^)| |Building DefaultExecutionTopology|O(N^2^)| |Initializing PipelinedRegionSchedulingStrategy|O(N^2^)| |Scheduling downstream tasks when a task finishes|O(N^2^)| |Calculating tasks to restart when a failover occurs|O(N^2^)| |Releasing result partitions|O(N^3^)| These procedures are all related to the complexity of the topology in the ExecutionGraph. Between two vertices connected with the all-to-all edges, all the upstream Intermediate ResultPartitions are connected to all downstream ExecutionVertices. The computation complexity of building and traversing all these edges will be O(N^2^). As for memory usage, currently we use ExecutionEdges to store the information of connections. For the all-to-all distribution type, there are O(N^2^) ExecutionEdges. We test a simple job with only two vertices. The parallelisms of them are both 10k. Furthermore, they are connected with all-to-all edges. It takes 4.175 GiB (estimated via MXBean) to store the 100M ExecutionEdges. In most large-scale jobs, there will be more than two vertices with large parallelisms, and they would cost a lot of time and memory to deploy the job. As we can see, for two JobVertices connected with the all-to-all distribution type, all IntermediateResultPartitions produced by the upstream ExecutionVertices are isomorphic, which means that the downstream ExecutionVertices they connected are exactly the same. The downstream ExecutionVertices belonging to the same JobVertex are also isomorphic, as the upstream ResultPartitions they connect are the same, too. Since every JobEdge has exactly one distribution type, we can divide the vertices and result partitions into groups according to the distribution type of the JobEdge. For the all-to-all distribution type, since all downstream vertices are isomorphic, they belong to a single group, and all the upstream result partitions are connected to this group. Vice versa, all the upstream result partitions also belong to a single group, and all the downstream vertices are connected to this group. In the past, when we wanted to iterate all the downstream vertices, we needed to loop over them n times, which leads to the complexity of O(N^2^). Now since all upstream result partitions are connected to one downstream group, we just need to loop over them once, with the complexity of O(N). For the pointwise distribution type, because each result partition is connected to different downstream vertices, they should belong to different groups. Vice versa, all the vertices belong to different groups. Since one result partition group is connected to one vertex group pointwisely, the computation complexity of looping over them is still O(N). !Illustration of Group.jpg! After we group the result partitions and vertices, ExecutionEdge is no longer needed. For the test job we mention above, the optimization can effectively reduce the memory usage from 4.175 GiB to 12.076 MiB (estimated via MXBean) in our POC. The time cost is reduced from 62.090 seconds to 8.551 seconds (with 10k parallelism). The detailed design doc will be attached once finished. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21201) Creating BoundedBlockingSubpartition blocks TaskManager’s main thread
Zhilong Hong created FLINK-21201: Summary: Creating BoundedBlockingSubpartition blocks TaskManager’s main thread Key: FLINK-21201 URL: https://issues.apache.org/jira/browse/FLINK-21201 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.12.1 Reporter: Zhilong Hong Attachments: jobmanager.log.tar.gz, taskmanager.log.tar.gz When we are trying to run batch jobs with 8k parallelism, it takes a long time to deploy the vertices. After the investigation, we find that creating BoundedBlockingSubpartition blocks TaskManager’s main thread during the procedure of {{submitTask}}. When JobMaster invokes {{submitTask}} and sends an RPC call to the TaskManager, the TaskManager will receive the RPC call and execute the {{submitTask}} method in its main thread. In the {{submitTask}} method, the TaskExecutor will create a Task instance and try to start it. During the creation, the TaskExecutor will create the ResultPartition and its ResultSubpartitions. For the batch job, the type of ResultSubpartitions is the BoundedBlockingSubpartition with the FileChannelBoundedData. The BoundedBlockingSubpartition will create a file on the local disk, which is an IO operation and could take a long time. In our test, it would take at most 28 seconds to create 8k BoundedBlockingSubpartitions. This procedure blocks the main thread of the TaskManager, and would lead to heartbeat timeout and slow task deploying. In my opinion, the IO operation should be executed with IOExecutor rather than the main thread. The log of JobManager and TaskManager is attached below. A typical task is Source 0: #898. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21326) Optimize building topology when initializing ExecutionGraph
Zhilong Hong created FLINK-21326: Summary: Optimize building topology when initializing ExecutionGraph Key: FLINK-21326 URL: https://issues.apache.org/jira/browse/FLINK-21326 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 The main idea of optimizing the procedure of building topology is to put all the vertices that consumed the same result partitions into one group, and put all the result partitions that have the same consumer vertices into one consumer group. The corresponding data structure is {{ConsumedPartitionGroup}} and {{ConsumerVertexGroup}}. {{EdgeManager}} is used to store the relationship between the groups. The procedure of creating {{ExecutionEdge}} is replaced with building {{EdgeManager}}. With these improvements, the complexity of building topology in ExecutionGraph decreases from O(N^2) to O(N). Furthermore, {{ExecutionEdge}} and all its related calls are replaced with {{ConsumedPartitionGroup}} and {{ConsumerVertexGroup}}. The detailed design doc is located at: https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit?usp=sharing -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21328) Optimize the initialization of DefaultExecutionTopology
Zhilong Hong created FLINK-21328: Summary: Optimize the initialization of DefaultExecutionTopology Key: FLINK-21328 URL: https://issues.apache.org/jira/browse/FLINK-21328 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 Based on FLINK-21326, the {{consumedResults}} in {{DefaultExecutionVertex}} and {{consumers}} in {{DefaultResultPartition}} can be replaced with {{ConsumedPartitionGroup}} and {{ConsumerVertexGroup}} in {{EdgeManager}}. # The method {{DefaultExecutionTopology#connectVerticesToConsumedPartitions}} could be removed. # All the related usages should be fixed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21330) Optimization the initialization of PipelinedRegionSchedulingStrategy
Zhilong Hong created FLINK-21330: Summary: Optimization the initialization of PipelinedRegionSchedulingStrategy Key: FLINK-21330 URL: https://issues.apache.org/jira/browse/FLINK-21330 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 {{PipelinedRegionSchedulingStrategy}} is used for task scheduling. Its initialization is located at {{PipelinedRegionSchedulingStrategy#init}}. The initialization can be divided into two parts: # Calculating consumed result partitions of SchedulingPipelinedRegions # Calculating the consumer pipelined region of SchedulingResultPartition Based on FLINK-21328, the {{consumedResults}} of {{DefaultSchedulingPipelinedRegion}} can be replaced with {{ConsumedPartitionGroup}}. Then we can optimize the procedures we mentioned above. After the optimization, the time complexity decreases from O(N^2) to O(N). The related usage of {{getConsumedResults}} should be replaced, too. The detailed design doc is located at: [https://docs.google.com/document/d/1OjGAyJ9Z6KsxcMtBHr6vbbrwP9xye7CdCtrLvf8dFYw/edit#heading=h.a1mz4yjpry6m] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21331) Optimize calculating tasks to restart in RestartPipelinedRegionFailoverStrategy
Zhilong Hong created FLINK-21331: Summary: Optimize calculating tasks to restart in RestartPipelinedRegionFailoverStrategy Key: FLINK-21331 URL: https://issues.apache.org/jira/browse/FLINK-21331 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 RestartPipelinedRegionFailoverStrategy is used to calculate the tasks to restart when a task failure occurs. It contains two parts: firstly calculate the regions to restart; then add all the tasks in these regions to the restarting queue. The bottleneck is mainly in the first part. This part traverses all the upstream and downstream regions of the failed region to determine whether they should be restarted or not. For the current failed region, if its consumed result partition is not available, the owner, i.e., the upstream region should restart. Also, since the failed region needs to restart, its result partition won't be available, all the downstream regions need to restart, too. 1. Calculating the upstream regions that should restart The current implementation is: {code:java} for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion: for each consumed SchedulingResultPartition of the SchedulingExecutionVertex: if the result partition is not available: add the producer region to the restart queue {code} Based on FLINK-21328, the consumed result partition of a vertex is already grouped. Here we can use a HashSet to record the visited result partition group. For vertices connected with all-to-all edges, they will only need to traverse the group once. This decreases the time complexity from O(N^2) to O(N). 2. Calculating the downstream regions that should restart The current implementation is: {code:java} for each SchedulingExecutionVertex in current visited SchedulingPipelinedRegion: for each produced SchedulingResultPartition of the SchedulingExecutionVertex: for each consumer SchedulingExecutionVertex of the produced SchedulingResultPartition: if the region containing the consumer SchedulingExecutionVertex is not visited: add the region to the restart queue {code} Since the count of the produced result partitions of a vertex equals the count of output JobEdges, the time complexity of this procedure is actually O(N^2). As the consumer vertices of a result partition are already grouped, we can use a HashSet to record the visited ConsumerVertexGroup. The time complexity decreases from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21332) Optimize releasing result partitions in RegionPartitionReleaseStrategy
Zhilong Hong created FLINK-21332: Summary: Optimize releasing result partitions in RegionPartitionReleaseStrategy Key: FLINK-21332 URL: https://issues.apache.org/jira/browse/FLINK-21332 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.13.0 RegionPartitionReleaseStrategy is responsible for releasing result partitions when all the downstream tasks finish. The current implementation is: {code:java} for each consumed SchedulingResultPartition of current finished SchedulingPipelinedRegion: for each consumer SchedulingPipelinedRegion of the SchedulingResultPartition: if all the regions are finished: release the partitions {code} The time complexity of releasing a result partition is O(N^2). However, considering that during the entire stage, all the result partitions need to be released, the time complexity is actually O(N^3). After the optimization of DefaultSchedulingTopology, the consumed result partitions are grouped. Since the result partitions in one group are isomorphic, we can just cache the finished status of result partition groups and the corresponding pipeline regions. The optimized implementation is: {code:java} for each ConsumedPartitionGroup of current finished SchedulingPipelinedRegion: if all consumer SchedulingPipelinedRegion of the ConsumedPartitionGroup are finished: set the ConsumePartitionGroup to be fully consumed for result partition in the ConsumePartitionGroup: if all the ConsumePartitionGroups it belongs to are fully consumed: release the result partition {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21524) Replace scheduler benchmarks with wrapper classes
Zhilong Hong created FLINK-21524: Summary: Replace scheduler benchmarks with wrapper classes Key: FLINK-21524 URL: https://issues.apache.org/jira/browse/FLINK-21524 Project: Flink Issue Type: Improvement Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Due to FLINK-21514, we find that when someone modifies interfaces and methods used by scheduler benchmarks, the compilation and execution of flink-benchmark may break. To improve the stability of scheduler benchmarks, we decide to replace scheduler benchmarks with wrapper/executor classes, and move the current implementations to the flink repository. Also we'll implement several unit tests based on the implementations. In this way, when someone modifies interfaces and methods used by scheduler benchmarks, the unit tests fails, and then they must fix the broken benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21525) Move scheduler benchmarks to Flink and add unit tests
Zhilong Hong created FLINK-21525: Summary: Move scheduler benchmarks to Flink and add unit tests Key: FLINK-21525 URL: https://issues.apache.org/jira/browse/FLINK-21525 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 To improve the stability of scheduler benchmarks, we'll move the implementation of scheduler benchmarks to the Flink repository and add several unit tests. When someone modifies the interfaces/methods used by scheduler benchmarks, the unit tests will fail. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21526) Replace scheduler benchmarks with wrapper classes
Zhilong Hong created FLINK-21526: Summary: Replace scheduler benchmarks with wrapper classes Key: FLINK-21526 URL: https://issues.apache.org/jira/browse/FLINK-21526 Project: Flink Issue Type: Sub-task Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 After moving the implementations of scheduler benchmarks to the Flink repository, we can replace scheduler benchmarks in flink-benchmark with wrapper classes. This will make sure the compilation and execution of flink-benchmark will not fail because of modifications in interfaces/methods used by scheduler benchmarks. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21731) Add a benchmark for DefaultScheduler#startScheduling
Zhilong Hong created FLINK-21731: Summary: Add a benchmark for DefaultScheduler#startScheduling Key: FLINK-21731 URL: https://issues.apache.org/jira/browse/FLINK-21731 Project: Flink Issue Type: Improvement Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 We notice that {{DefaultScheduler#allocateSlotsAndDeploy}} is not covered in the current scheduler benchmark. When we are trying to implement a benchmark related to this procedure, we think that it's better to implement a benchmark that covers the entire {{DefaultScheduler#startScheduling}} procedure instead. This can avoid missing any parts that may affect the performance of scheduling. Also in this way we don't need to add benchmarks for every sub-procedure related to scheduling, which makes the benchmark heavy and hard to maintain. Then we can just focus on the performance-sensitive procedures, as the existing benchmarks do. The new benchmark item is implemented based on {{DefaultSchedulerBatchSchedulingTest}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21915) Optimize Execution#finishPartitionsAndUpdateConsumers
Zhilong Hong created FLINK-21915: Summary: Optimize Execution#finishPartitionsAndUpdateConsumers Key: FLINK-21915 URL: https://issues.apache.org/jira/browse/FLINK-21915 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Based on the scheduler benchmark {{PartitionReleaseInBatchJobBenchmark}} introduced in FLINK-20612, we find that there's another procedure that has O(N^2) computation complexity: {{Execution#finishPartitionsAndUpdateConsumers}}. Once an execution is finished, it will finish all its BLOCKING partitions and update the partition info to all consumer vertices. The procedure can be illustrated as the following pseudo code: {code:java} for all Execution in ExecutionGraph: for all produced IntermediateResultPartition of the Execution: for all consumer ExecutionVertex of the IntermediateResultPartition: update or cache partition info{code} This procedure has O(N^2) complexity in total. Based on FLINK-21326, the consumed partitions are grouped if they are connected to the same consumer vertices. Therefore, we can update partition info of the entire ConsumedPartitionGroup in batch, rather than one by one. This will decrease the complexity from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21920) Optimize DefaultScheduler#allocateSlots
Zhilong Hong created FLINK-21920: Summary: Optimize DefaultScheduler#allocateSlots Key: FLINK-21920 URL: https://issues.apache.org/jira/browse/FLINK-21920 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 Based on the scheduler benchmark introduced in FLINK-21731, we find that there are several procedures related to {{DefaultScheduler#allocateSlots}} have O(N^2) complexity. The first one is: {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}. The original implementation is: {code:java} for all SchedulingExecutionVertex in DefaultScheduler: for all consumed SchedulingResultPartition of the SchedulingExecutionVertex: get the result partition's producer vertex and determine the ExecutionSlotSharingGroup where the producer vertex locates is available for current vertex{code} This procedure has O(N^2) complexity. It's obvious that the result partitions in the same ConsumedPartitionGroup have the same producer vertex. So we can just iterate over the ConsumedPartitionGroups instead of all the consumed partitions. This will decrease the complexity from O(N^2) to O(N). The second one is: {{ExecutionGraphToInputsLocationsRetrieverAdapter#getConsumedResultPartitionsProducers}}. The original implementation is: {code:java} for all SchedulingExecutionVertex in DefaultScheduler: for all ConsumedPartitionGroup of the SchedulingExecutionVertex: for all IntermediateResultPartition in the ConsumedPartitionGroup: get producer of the IntermediateResultPartition {code} This procedure has O(N^2) complexity. We can see that for each SchedulingExecutionVertex, the producers of its ConsumedPartitionGroup is calculated separately. For SchedulingExecutionVertices in the same ConsumerVertexGroup, they have the same ConsumedPartitionGroup. Thus, we don't need to calculate the producers over and over again. We can use a local cache to cache the producers. This will decrease the complexity from O(N^2) to O(N). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-21975) Remove hamcrest dependency from SchedulerBenchmarkBase
Zhilong Hong created FLINK-21975: Summary: Remove hamcrest dependency from SchedulerBenchmarkBase Key: FLINK-21975 URL: https://issues.apache.org/jira/browse/FLINK-21975 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 When we are trying to add BenchmarkExecutor for benchmarks introduced in FLINK 21731, we found that the dependency of hamcrest is introduced by mistake. Since there's no hamcrest dependency in flink-benchmark, this will break the benchmark. Thus we need to remove this dependency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22017) Regions may never be scheduled when there are cross-region blocking edges
Zhilong Hong created FLINK-22017: Summary: Regions may never be scheduled when there are cross-region blocking edges Key: FLINK-22017 URL: https://issues.apache.org/jira/browse/FLINK-22017 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Attachments: Illustration.jpg For the topology with cross-region blocking edges, there are regions that may never be scheduled. The case is illustrated in the figure below. !Illustration.jpg! Let's denote the vertices with layer_number. It's clear that the edge connects v2_2 and v3_2 crosses region 1 and region 2. Since region 1 has no blocking edges connected to other regions, it will be scheduled first. When vertex2_2 is finished, PipelinedRegionSchedulingStrategy will trigger {{onExecutionStateChange}} for it. As expected, region 2 will be scheduled since all its consumer partitions are consumable. But in fact region 2 won't be scheduled, because the result partition of vertex2_2 is not tagged as consumable. Whether it is consumable or not is determined by its IntermediateDataSet. However, an IntermediateDataSet is consumable if and only if all the producers of its IntermediateResultPartitions are finished. This IntermediateDataSet will never be consumable since vertex2_3 is not scheduled. All in all, this forms a deadlock that a region will never be scheduled because it's not scheduled. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22037) Remove the redundant blocking queue from DeployingTasksBenchmarkBase
Zhilong Hong created FLINK-22037: Summary: Remove the redundant blocking queue from DeployingTasksBenchmarkBase Key: FLINK-22037 URL: https://issues.apache.org/jira/browse/FLINK-22037 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 We find that the {{BlockingQueue}} that used to preserve {{TaskDeploymentDescriptors}} are never used later. Since {{TaskDeploymentDescriptors}} would cost massive heap memory, it may introduce unnecessary garbage collection and make the result of benchmarks related to deployment unstable. So we think it's better to remove the redundant blocking queue in {{DeployingTasksBenchmarkBase}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22077) Wrong way to calculate cross-region ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy
Zhilong Hong created FLINK-22077: Summary: Wrong way to calculate cross-region ConsumedPartitionGroups in PipelinedRegionSchedulingStrategy Key: FLINK-22077 URL: https://issues.apache.org/jira/browse/FLINK-22077 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 h3. Introduction We implement a wrong way to calculate cross-region ConsumedPartitionGroups in {{PipelinedRegionSchedulingStrategy}}, it slows down the procedure of {{onExecutionStateChange}}, make the complexity from O(N) to O(N^2). Also the semantic of cross-region is totally wrong. h3. Details In {{PipelinedRegionSchedulingStrategy#startScheduling}}, as expected, we need to schedule all region with no external blocking edges, i.e., source regions. To decrease the complexity, we choose to schedule all the regions that has no external BLOCKING ConsumedPartitionGroups. However, for the case illustrated in FLINK-22017, the region 2 has a ConsumedPartitionGroup, which has both internal and external blocking IntermediateResultPartitions. If we choose one to represent the entire ConsumedPartitionGroup, it may choose the internal one, and make the entire group internal. Region 2 will be scheduled. As Region 1 is not finished, Region 2 cannot transition to running. A deadlock may happen if resource is not enough for both two regions. To make it right, we introduced cross-region ConsumedPartitionGroups in FLINK-21330. The regions which has ConsumedPartitionGroups with both internal and external blocking IntermediateResultPartitions will be recorded. When we call {{startScheduling}}, these ConsumedPartitionGroups will be treated as external, and region 2 will not be scheduled. But we have to admit that the implementation of cross-region is wrong. The ConsumedPartitionGroups that has multiple producer regions will be treated as cross-region groups. It is not the same logic as we mentioned above. The semantic is totally wrong. Also all the ALL-TO-ALL BLOCKING ConsumedPartitionGroups will be treated as cross-region groups, since their producers are in different regions. (Each producer has its own region.) This slows down the complexity from O(N) to O(N^2) for ALL-TO-ALL BLOCKING edges. h3. Solution To correctly calculate the cross-region ConsumedPartitionGroups, we can just calculate the producer regions for all ConsumedPartitionGroups, and then iterate all the regions and its ConsumedPartitionGroups. If the ConsumedPartitionGroup has two or more producer regions, and the regions contains current region, it is a cross-region ConsumedPartitionGroup. This meets the correct semantics, and make sure ALL-TO-ALL BLOCKING ConsumedPartitionGroups will not be treated as cross-region one. This fix will also decreases the complexity from O(N) to O(N^2). I prefer it's necessary to add this bug-fix to release 1.13. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22241) Record AllocationID instead of JobID when marking slot active failed
Zhilong Hong created FLINK-22241: Summary: Record AllocationID instead of JobID when marking slot active failed Key: FLINK-22241 URL: https://issues.apache.org/jira/browse/FLINK-22241 Project: Flink Issue Type: Improvement Components: Runtime / Task Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 For {{TaskExecutor#handleAcceptedSlotOffers}}, at present when slot failed to be marked as failed, the log prints JobID instead of AllocationID: {code:java} final String message = "Could not mark slot " + jobId + " active."; {code} I think it's better to print the AllocationID here like this: {code:java} final String message = "Could not mark slot " + acceptedSlot.getAllocationId() + " active."; {code} or {code:java} final String message = String.format("Could not mark slot %s active for job %s.", acceptedSlot.getAllocationId(), jobId); {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22284) Null address will be logged when channel is closed in NettyPartitionRequestClient
Zhilong Hong created FLINK-22284: Summary: Null address will be logged when channel is closed in NettyPartitionRequestClient Key: FLINK-22284 URL: https://issues.apache.org/jira/browse/FLINK-22284 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.13.0 In NettyPartitionRequestClient#requestSubpartition, when channel is closed, the channel will throw a LocalTransportException with the error message "Sending the partition request to 'null' failed.". The message is confusing since we wouldn't know where the remote client connected to this channel locates, and we couldn't track down to that TaskExecutor and find out what happened. Also I'm wondering that should we use TransportException instead of LocalTransportException here, because it's a little confusing to see a LocalTransportException is thrown out when a remote channel is closed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22643) Too many TCP connections among TaskManagers for large scale jobs
Zhilong Hong created FLINK-22643: Summary: Too many TCP connections among TaskManagers for large scale jobs Key: FLINK-22643 URL: https://issues.apache.org/jira/browse/FLINK-22643 Project: Flink Issue Type: Improvement Components: Runtime / Network Affects Versions: 1.13.0 Reporter: Zhilong Hong Fix For: 1.14.0 For the large scale jobs, there will be too many TCP connections among TaskManagers. Let's take an example. For a streaming job with 20 JobVertices, each JobVertex has 500 parallelism. We divide the vertices into 5 slot sharing groups. Each TaskManager has 5 slots. Thus there will be 400 taskmanagers in this job. Let's assume that job runs on a cluster with 20 machines. If all the job edges are all-to-all edges, there will be 19 * 20 * 399 * 2 = 303,240 TCP connections for each machine. If we run several jobs on this cluster, the TCP connections may exceed the maximum limit of linux, which is 1,048,576. This will stop the TaskManagers from creating new TCP connections and cause task failovers. As we run our production jobs on a K8S cluster, the job always failover due to exceptions related to network, such as {{Sending the partition request to 'null' failed}}, and etc. We think that we can decrease the number of connections by letting tasks reuse the same connection. We implemented a POC that makes all tasks on the same TaskManager reuse one TCP connection. For the example job we mentioned above, the number of connections will decrease from 303,240 to 15960. With the POC, the frequency of meeting exceptions related to network in our production jobs drops significantly. The POC is illustrated in: https://github.com/wsry/flink/commit/bf1c09e80450f40d018a1d1d4fe3dfd2de777fdc -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22767) Optimize the initialization of ExecutionSlotSharingGroupBuilder
Zhilong Hong created FLINK-22767: Summary: Optimize the initialization of ExecutionSlotSharingGroupBuilder Key: FLINK-22767 URL: https://issues.apache.org/jira/browse/FLINK-22767 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong Based on the scheduler benchmark introduced in FLINK-21731, we find that during the initialization of ExecutionSlotSharingGroupBuilder, there's a procedure that has O(N^2) complexity: {{ExecutionSlotSharingGroupBuilder#tryFindAvailableProducerExecutionSlotSharingGroupFor}}. This initialization happens during the initialization of LocalInputPreferredSlotSharingStrategy. The original implementation is: {code:java} for all SchedulingExecutionVertex in DefaultScheduler: for all consumed SchedulingResultPartition of the SchedulingExecutionVertex: get the result partition's producer vertex and determine the ExecutionSlotSharingGroup where the producer vertex locates is available for current vertex{code} This procedure has O(N^2) complexity. It's obvious that the result partitions in the same ConsumedPartitionGroup have the same producer vertex. So we can just iterate over the ConsumedPartitionGroups instead of all the consumed partitions. This will decrease the complexity from O(N^2) to O(N). The optimization of this procedure will speed up the initialization of DefaultScheduler. It will accelerate the submission of a new job, especially for OLAP jobs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22773) Optimize the construction of pipelined regions
Zhilong Hong created FLINK-22773: Summary: Optimize the construction of pipelined regions Key: FLINK-22773 URL: https://issues.apache.org/jira/browse/FLINK-22773 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Affects Versions: 1.13.0 Reporter: Zhilong Hong During the initialization of DefaultExecutionTopology, pipelined regions will be computed for scheduling. Currently the complexity of this procedure is O(N^2): {code:java} for all vertices in the topology: for all consumed results of the vertex: if the consumed result is reconnectable: merge the current region with its producer region {code} One possible solution is mentioned in FLINK-17330. If we can optimize this procedure from O(N^2) to O(N), it will speed up the initialization of SchedulerNG, and accelerate the submission of a new job, especially for OLAP jobs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-22863) ArrayIndexOutOfBoundsException may happen when building rescale edges
Zhilong Hong created FLINK-22863: Summary: ArrayIndexOutOfBoundsException may happen when building rescale edges Key: FLINK-22863 URL: https://issues.apache.org/jira/browse/FLINK-22863 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.13.1, 1.13.0 Reporter: Zhilong Hong Fix For: 1.14.0, 1.13.2 Attachments: image-2021-06-03-15-06-09-301.png For EdgeManagerBuildUtil introduced in FLINK-21326, we find that during the construction of rescale edges, it may throw ArrayIndexOutOfBoundsException like this: !image-2021-06-03-15-06-09-301.png|width=938,height=200! It is mainly caused by the precision of {{double}} in Java. In EdgeManagerBuildUtil#connectPointwise, when upstream parallelism < downstream parallelism, we calculate the indices of downstream vertices that connect to each upstream partition like this: {code:java} int start = (int) (Math.ceil(partitionNum * factor)); int end = (int) (Math.ceil((partitionNum + 1) * factor)); {code} The index range is [{{start}}, {{end}}). In some cases the value of {{end}} may exceed the downstream parallelism and throw the ArrayIndexOutOfBoundsException. Let's take an example. The upstream parallelism is 7. The downstream parallelism is 29. For the last upstream partition (which {{partitionNum}} is 6), {{(partitionNum + 1) * factor}} is 29.2, which is slightly larger than 29. This is caused by the precision of {{double}}. Then {{end}} = {{Math.ceil(29.2)}}, which is 30. ArrayIndexOutOfBoundsException is thrown here. To solve this issue, we need to add an extra check for the boundary condition like this: {code:java} int end = Math.min(targetCount, (int) (Math.ceil((partitionNum + 1) * factor))); {code} This affects release-1.13.0 and release-1.13.1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23005) Optimize the deployment of tasks
Zhilong Hong created FLINK-23005: Summary: Optimize the deployment of tasks Key: FLINK-23005 URL: https://issues.apache.org/jira/browse/FLINK-23005 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.14.0 h3. Introduction The optimizations introduced in FLINK-21110 so far have improved the performance of job initialization, failover and partitions releasing. However, the task deployment is still slow. For a job with two vertices, each vertex has 8k parallelism and they are connected with the all-to-all edge. It takes 32.611s to deploy all the tasks and make them transition to running. If the parallelisms are 16k, it may take more than 2 minutes. As the creation of TaskDeploymentDescriptors runs in the main thread of jobmanager, it means that the jobmanager cannot deal with other akka messages like heartbeats, task status update, and etc., for more than two minutes. All in all, currently there are two issues in the deployment of tasks for large scale jobs: # It takes a long time to deploy tasks, especially for all-to-all edges. # Heartbeat timeout may happen during or after the procedure of task deployments. For the streaming job, it would cause the failover of the entire region. The job may never transition to running since there would be another heartbeat timeout during the procedure of new task deployments. h3. Proposal Task deployments involves the following procedures: # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread # TaskDeploymentDescriptor is serialized in the future executor # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call # TaskExecutors create a new task thread and execute it The optimization contains two parts: *1. Cache the compressed serialized value of ShuffleDescriptors* ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the IntermediateResultPartitions that a task consumes. For the downstream vertices connected with the all-to-all edge that has _N_ parallelism, we need to calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, they share the same ShuffleDescriptors since they all consume the same IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for each downstream vertex individually. We can just cache them. This will decrease the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to O(N). Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ times, so we can just cache the serialized value of ShuffleDescriptors instead of the original object. To decrease the size of akka messages and reduce replicated data over the network, these serialized value can be compressed. *2. Distribute the ShuffleDescriptors via blob server* For ShuffleDescriptors of vertices with 8k parallelism, the size of their serialized value is more than 700 Kilobytes. After the compression, it would be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would become a heavy burden for the garbage collector to deal with. In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed via the blob server if their sizes exceed a certain threshold (which is defined as {{blob.offload.minsize}}). TaskExecutors request the information from the blob server once they begin to process the TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep all the copies in the heap memory until the TaskDeploymentDescriptors are all sent. There will be only one copy on the blob server. Like the JobInformation, we can just distribute the cached ShuffleDescriptors via the blob server if their overall size has exceeded the threshold. h3. Summary In summary, the optimization of task deployment is to introduce a cache for the TaskDeploymentDescriptor. We cache the compressed serialized value of ShuffleDescriptors. If the size of the value exceeds a certain threshold, the value would be distributed via the blob server. h3. Comparison We implemented a POC and conducted an experiment to compare the performance of our optimization. We choose the streaming job in the experiment because no task will be running until all tasks are deployed. This avoids other disturbing factors. The job contains two vertices: a source and a sink. They are connected with an all-to-all edge. The results illustrated below are the time interval between the timestamp of the first task that transitions to _deploying_ and the timestamp of the last task that transitions to _running_: ||Parallelism||Before||After || |8000*8000|32.611s|6.480
[jira] [Created] (FLINK-23153) Benchmark not compiling
Zhilong Hong created FLINK-23153: Summary: Benchmark not compiling Key: FLINK-23153 URL: https://issues.apache.org/jira/browse/FLINK-23153 Project: Flink Issue Type: Bug Components: Benchmarks Affects Versions: 1.14.0 Reporter: Zhilong Hong Fix For: 1.14.0 In FLINK-23085, FutureUtils is moved from flink-runtime to flink-core. The reference in flink-benchmark should also be changed. One known reference is located at: org/apache/flink/benchmark/operators/RecordSource.java. The travis CI is broken at this moment: https://travis-ci.com/github/apache/flink-benchmarks/builds/230813827#L2026 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23172) Links of restart strategy in configuration page is broken
Zhilong Hong created FLINK-23172: Summary: Links of restart strategy in configuration page is broken Key: FLINK-23172 URL: https://issues.apache.org/jira/browse/FLINK-23172 Project: Flink Issue Type: Technical Debt Components: Documentation Affects Versions: 1.14.0 Reporter: Zhilong Hong Fix For: 1.14.0 The links in Fault Tolerance section of [the configuration page|https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/config/#fault-tolerance/] is broken. Currently the link refers to https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/dev/task_failure_recovery.html#fixed-delay-restart-strategy, which doesn't exist and would head to 404 error. The correct link is https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/state/task_failure_recovery/#fixed-delay-restart-strategy. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23218) Distribute the ShuffleDescriptors via blob server
Zhilong Hong created FLINK-23218: Summary: Distribute the ShuffleDescriptors via blob server Key: FLINK-23218 URL: https://issues.apache.org/jira/browse/FLINK-23218 Project: Flink Issue Type: Sub-task Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.14.0 h3. Introduction The optimizations introduced in FLINK-21110 so far have improved the performance of job initialization, failover and partitions releasing. However, the task deployment is still slow. For a job with two vertices, each vertex has 8k parallelism and they are connected with the all-to-all edge. It takes 32.611s to deploy all the tasks and make them transition to running. If the parallelisms are 16k, it may take more than 2 minutes. As the creation of TaskDeploymentDescriptors runs in the main thread of jobmanager, it means that the jobmanager cannot deal with other akka messages like heartbeats, task status update, and etc., for more than two minutes. All in all, currently there are two issues in the deployment of tasks for large scale jobs: # It takes a long time to deploy tasks, especially for all-to-all edges. # Heartbeat timeout may happen during or after the procedure of task deployments. For the streaming job, it would cause the failover of the entire region. The job may never transition to running since there would be another heartbeat timeout during the procedure of new task deployments. h3. Proposal Task deployment involves the following procedures: # Jobmanager creates TaskDeploymentDescriptor for each task in the main thread # TaskDeploymentDescriptor is serialized in the future executor # Akka transports TaskDeploymentDescriptors to TaskExecutors via RPC call # TaskExecutors create a new task thread and execute it The optimization contains two parts: *1. Cache the compressed serialized value of ShuffleDescriptors* ShuffleDescriptors in the TaskDeploymentDescriptor are used to describe the IntermediateResultPartitions that a task consumes. For the downstream vertices connected with the all-to-all edge that has _N_ parallelism, we need to calculate _N_ ShuffleDescriptors for _N_ times. However, for these vertices, they share the same ShuffleDescriptors since they all consume the same IntermediateResultPartitions. We don't need to calculate ShuffleDescriptors for each downstream vertex individually. We can just cache them. This will decrease the overall complexity of calculating TaskDeploymentDescriptors from O(N^2) to O(N). Furthermore, we don't need to serialize the same ShuffleDescriptors for _N_ times, so we can just cache the serialized value of ShuffleDescriptors instead of the original object. To decrease the size of akka messages and reduce the transmission of replicated data over the network, these serialized value can be compressed. *2. Distribute the ShuffleDescriptors via blob server* For ShuffleDescriptors of vertices with 8k parallelism, the size of their serialized value is more than 700 Kilobytes. After the compression, it would be 200 Kilobytes or so. The overall size of 8k TaskDeploymentDescriptors is more than 1.6 Gigabytes. Since Akka cannot send the messages as fast as the TaskDeploymentDescriptors are created, these TaskDeploymentDescriptors would become a heavy burden for the garbage collector to deal with. In TaskDeploymentDescriptor, JobInformation and TaskInformation are distributed via the blob server if their sizes exceed a certain threshold (which is defined as {{blob.offload.minsize}}). TaskExecutors request the information from the blob server once they begin to process the TaskDeploymentDescriptor. This make sure that jobmanager don't need to keep all the copies in the heap memory until the TaskDeploymentDescriptors are all sent. There will be only one copy on the blob server. Like the JobInformation, we can just distribute the cached ShuffleDescriptors via the blob server if their overall size has exceeded the threshold. h3. Summary In summary, the optimization of task deployment is to introduce a cache for the TaskDeploymentDescriptor. We cache the compressed serialized value of ShuffleDescriptors. If the size of the value exceeds a certain threshold, the value would be distributed via the blob server. h3. Comparison We implemented a POC and conducted an experiment to compare the performance of our optimization. We choose the streaming job in the experiment because no task will be running until all tasks are deployed. This avoids other disturbing factors. The job contains two vertices: a source and a sink. They are connected with an all-to-all edge. The results illustrated below are the time interval between the timestamp of the first task that transitions to _deploying_ and the timestamp of the last task that transitions to _running_: ||Parallelism
[jira] [Created] (FLINK-23354) Limit the size of blob cache on TaskExecutor
Zhilong Hong created FLINK-23354: Summary: Limit the size of blob cache on TaskExecutor Key: FLINK-23354 URL: https://issues.apache.org/jira/browse/FLINK-23354 Project: Flink Issue Type: Improvement Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.14.0 Currently a TaskExecutor uses BlobCache to cache the blobs transported from JobManager. The caches are the local file stored on the TaskExecutor. The blob cache will not be cleaned up until one hour after the related job is finished. At present, JobInformation and TaskInformation are transported via blob. If a lot of jobs are submitted, the blob cache will occupy large amount of disk space. In FLINK-23218, we are going to distribute the cached ShuffleDescriptors via blob. When large amount of failovers happen, there will be a lot of cache stored on local disk. In extreme cases, the blob would blow up the disk space. So we need to add a limit size for the blob cache on TaskExecutor, as described in the comments of FLINK-23218. The main idea is to add a size limit and and delete blobs in LRU order if the size limit is exceeded. Before a blob item is cached, TaskExecutor will firstly check the overall size of cache. If the overall size exceeds the limit, the blob will be deleted in LRU order until the limit is not exceeded anymore. For the blob cache that is deleted, if it is used afterwards, it will be downloaded from the blob server again. The default value of the size limit of the blob cache on TaskExecutor will be 10GiB. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23491) Scheduler benchmark not running
Zhilong Hong created FLINK-23491: Summary: Scheduler benchmark not running Key: FLINK-23491 URL: https://issues.apache.org/jira/browse/FLINK-23491 Project: Flink Issue Type: Bug Components: Benchmarks, Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhilong Hong Fix For: 1.14.0 As shown in [codespeed|http://codespeed.dak8s.net:8000/timeline/#/?exe=5&env=2&revs=1000&equid=off&quarts=on&extr=on&ben=grid], we find that the scheduler benchmark are not running from July 22nd to July 25th. The last valid commit revision is fce9c1d8. I tried to run the scheduler benchmarks locally and find that they work well. I've checked the log and find that the scheduling benchmark is not running because: {code:bash} java.lang.NoClassDefFoundError: org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl at org.apache.flink.scheduler.benchmark.e2e.CreateSchedulerBenchmarkExecutor.setup(CreateSchedulerBenchmarkExecutor.java:51) at org.apache.flink.scheduler.benchmark.e2e.generated.CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest._jmh_tryInit_f_createschedulerbenchmarkexecutor0_0(CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.java:342) at org.apache.flink.scheduler.benchmark.e2e.generated.CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.createScheduler_SingleShotTime(CreateSchedulerBenchmarkExecutor_createScheduler_jmhTest.java:294) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:453) at org.openjdk.jmh.runner.BenchmarkHandler$BenchmarkTask.call(BenchmarkHandler.java:437) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 15 more{code} It seems that the benchmark is still using {{SlotPoolImpl}} removed in FLINK-22477. But it's weird that {{SlotPoolImpl}} is removed on July 13th, while the benchmark is not running on July 22nd. I think maybe it's related to JMH? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23599) Remove JobVertex#connectIdInput
Zhilong Hong created FLINK-23599: Summary: Remove JobVertex#connectIdInput Key: FLINK-23599 URL: https://issues.apache.org/jira/browse/FLINK-23599 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhilong Hong Fix For: 1.14.0 {{JobVertex#connectIdInput}} is not used in production anymore. It's only used in the unit tests {{testAttachViaIds}} and {{testCannotConnectMissingId}} located in {{DefaultExecutionGraphConstructionTest}}. However, these two test cases are designed to test this method. Therefore, this method and its test cases can be removed. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23833) Cache of ShuffleDescriptors should be individually cleaned up
Zhilong Hong created FLINK-23833: Summary: Cache of ShuffleDescriptors should be individually cleaned up Key: FLINK-23833 URL: https://issues.apache.org/jira/browse/FLINK-23833 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhilong Hong Fix For: 1.14.0 {color:#172b4d}In FLINK-23005, we introduce the cache of compressed serialized value for ShuffleDescriptors to improve the performance of deployment. To make sure the cache wouldn't stay too long and become a burden for GC, the cache would be cleaned up when the partition is released or reset for new execution. In the implementation, the cache of the entire IntermediateResult is cleaned up because a partition is released only when the entire IntermediateResult is released. {color} {color:#172b4d}However, after FLINK-22017, the BLOCKING result partition is allowed to be consumable individually. It also means that the result partition doesn't need to wait for other result partitions and can be released individually. After this change, there may be a scene: when a result partition is finished, the cache of IntermediateResult on the blob is deleted, while other result partitions corresponding to this IntermediateResult is just deployed to the TaskExecutor. Then when TaskExecutors are trying to download TDD from the blob, they will find the blob is deleted and get stuck.{color} {color:#172b4d}This bug only happens for jobs with POINTWISE BLOCKING edge. Also, the {{blob.offload.minsize}} is set to be a extremely small value, since the size of ShuffleDescriptors of POINTWISE BLOCKING edges is usually small. To solve this issue, we just need to clean up the cache of ShuffleDescriptors individually.{color} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-23879) Benchmark not compiling
Zhilong Hong created FLINK-23879: Summary: Benchmark not compiling Key: FLINK-23879 URL: https://issues.apache.org/jira/browse/FLINK-23879 Project: Flink Issue Type: Bug Components: Benchmarks Reporter: Zhilong Hong Fix For: 1.14.0 The benchmark is not compiling from Aug. 16th, 2021. The error is: {noformat} [2021-08-19T23:18:36.242Z] [ERROR] /home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:47:54: error: package org.apache.flink.streaming.runtime.streamstatus does not exist [2021-08-19T23:18:36.242Z] [ERROR] /home/jenkins/workspace/flink-scheduler-benchmarks/flink-benchmarks/src/main/java/org/apache/flink/benchmark/SortingBoundedInputBenchmarks.java:350:40: error: cannot find symbol{noformat} It seems to be introduced by FLINK-23767, in which {{StreamStatus}} is replaced with {{WatermarkStatus}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24044) Errors are output when compiling flink-runtime-web
Zhilong Hong created FLINK-24044: Summary: Errors are output when compiling flink-runtime-web Key: FLINK-24044 URL: https://issues.apache.org/jira/browse/FLINK-24044 Project: Flink Issue Type: Bug Components: Runtime / Web Frontend Affects Versions: 1.14.0 Reporter: Zhilong Hong When compiling the module {{flink-runtime-web}}, the terminal would be filled with errors as below: {code:bash} [ERROR] - Generating browser application bundles (phase: setup)... [ERROR] [ERROR] Compiling @angular/core : es2015 as esm2015 [ERROR] [ERROR] Compiling @angular/common : es2015 as esm2015 [ERROR] [ERROR] Compiling @angular/platform-browser : es2015 as esm2015 [ERROR] [ERROR] Compiling @angular/platform-browser-dynamic : es2015 as esm2015 [ERROR] [ERROR] Compiling @angular/router : es2015 as esm2015 ... {code} Although it doesn't break the compilation, maybe we should fix this. I'm not familiar with the module flink-runtime-web or Angular. I found something that may be useful: [https://github.com/angular/angular/issues/36513] [https://github.com/angular/angular/issues/31853] [https://stackoverflow.com/questions/57220392/even-though-i-am-using-target-esm2015-ivy-is-compiles-as-a-esm5-module/57220445#57220445] [https://stackoverflow.com/questions/61304548/simple-angular-9-project-compiles-whole-angular-material-es2015-as-esm2015] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24295) Too many requestPartitionState may jam the JobManager during task deployment
Zhilong Hong created FLINK-24295: Summary: Too many requestPartitionState may jam the JobManager during task deployment Key: FLINK-24295 URL: https://issues.apache.org/jira/browse/FLINK-24295 Project: Flink Issue Type: Bug Components: Runtime / Coordination Affects Versions: 1.14.0 Reporter: Zhilong Hong After the optimization of the phase 2 we've done in FLINK-21110, the speed of task deployment has accelerated. However, we find that during the task deployment, there may be too many {{requestPartitionState}} RPC calls from TaskManagers that would jam the JobManager. Why would there be too many {{requestPartitionState}} RPC calls? After the optimization, the JobManager can submit tasks to TaskManagers quickly. If JobManager calls {{submitTask}} faster than the speed of dealing with {{submitTask}} by TaskManagers, there may be a scenario that some TaskManagers deploy tasks faster than other TaskManagers. When a downstream task is deployed, it would try to request partitions from upstream tasks, which may be located at a remote TaskManager. If the upstream tasks are not deployed, it would request the partition state from JobManager. In the worst case, the complexity of the computation and memory would be O(N^2). In our test with a streaming job, which has two vertices with the 8,000 parallelism and connected with all-to-all edges, in the worst case, there will be 32,000,000 {{requestPartitionState}} RPC calls in the JobManager. Each RPC call requires 1 KiB space in the heap memory of the JobManager. The overall space cost of {{requestPartitionState}} will be 32 GiB, which is a heavy burden for GC to deal with. In our test, the size of the heap memory of JobManager is 8 GiB. During the task deployment the JobManager gets more full GCs. The JobManager gets stuck since it is filled with full GCs and has no time to deal with the incoming RPC calls. The log is attached below. The worst thing is that there's no log outputted for this RPC call. When a user find the JobManager is get slower or get stuck, he/she won't be able to find out why. Why does this case rarely happen before? Before the optimization, it takes a long time to calculate TaskDeploymentDescriptors and send them to TaskManagers. JobManager calls {{submitTask}} more slowly than the speed of dealing with {{submitTask}} by TaskManagers in most cases. Since the deployment of tasks are topologically sorted, the upstream tasks is deployed before the downstream tasks, and this case rarely happens. In my opinion, the solution of this issue needs more discussion. According to the discussion in the pull request ([https://github.com/apache/flink/pull/6680]), it's not safe to remove this RPC call, because we cannot always make sure the assumption that an upstream task failure will always fail the downstream consumers is always right. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24300) MultipleInputOperator is running much more slowly in TPCDS
Zhilong Hong created FLINK-24300: Summary: MultipleInputOperator is running much more slowly in TPCDS Key: FLINK-24300 URL: https://issues.apache.org/jira/browse/FLINK-24300 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.14.0, 1.15.0 Reporter: Zhilong Hong Attachments: 64570e4c56955713ca599fd1d7ae7be891a314c6.png, detail-of-the-job.png, e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png, jstack.txt When we are running TPCDS with release 1.14 we find that the job with MultipleInputOperator is running much more slowly than before. With a binary search among the commits, we find that the issue may be introduced by FLINK-23408. At the commit 64570e4c56955713ca599fd1d7ae7be891a314c6, the job runs normally in TPCDS, as the image below illustrates: !64570e4c56955713ca599fd1d7ae7be891a314c6.png|width=600! At the commit e3010c16947ed8da2ecb7d89a3aa08dacecc524a, the job q2.sql gets stuck for a pretty long time (longer than half an hour), as the image below illustrates: !e3010c16947ed8da2ecb7d89a3aa08dacecc524a.png|width=600! The detail of the job is illustrated below: !detail-of-the-job.png|width=600! The job uses a {{MultipleInputOperator}} with one normal input and two chained FileSource. It has finished reading the normal input and start to read the chained source. Each chained source has one source data fetcher. We capture the jstack of the stuck tasks and attach the file below. From the [^jstack.txt] we can see the main thread is blocked on waiting for the lock, and the lock is held by a source data fetcher. The source data fetcher is still running but the stack keeps on {{CompletableFuture.cleanStack}}. This issue happens in a batch job. However, from where it get blocked, it seems also affects the streaming jobs. For the reference, the code of TPCDS we are running is located at [https://github.com/ververica/flink-sql-benchmark/tree/dev]. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24316) Refactor IntermediateDataSet to have only one consumer
Zhilong Hong created FLINK-24316: Summary: Refactor IntermediateDataSet to have only one consumer Key: FLINK-24316 URL: https://issues.apache.org/jira/browse/FLINK-24316 Project: Flink Issue Type: Technical Debt Components: Runtime / Coordination Reporter: Zhilong Hong Fix For: 1.15.0 Currently, IntermediateDataSet has an assumption that an IntermediateDataSet can be consumed by multiple consumers. However, this assumption has never came to reality. For an upstream vertex that is connected to multiple downstream vertices, it will generate multiple IntermediateDataSets. Each consumer is corresponding to one IntermediateDataSet. Furthermore, there are several checks in the code to make sure that an IntermediateDataSet has only one consumer, like {{Execution#getPartitionMaxParallelism}}, {{SsgNetworkMemoryCalculationUtils#getMaxSubpartitionNums}}, and etc. These checks make the logic complicated. And it's hard to guarantee the consistency, because we can't make sure all the calls to {{getConsumers}} have this check in the future. Since multiple consumers for IntermediateDataSet may not come true in a long time, we think maybe it's better to refactor IntermediateDataSet to have only one consumer, as the discussion mentioned in [https://github.com/apache/flink/pull/16856]. If we are going to support multiple consumers for IntermediateDataSet in the future, we can just bring it back and refactor all the usages. As IntermediateDataSet changes, all classes related to it should change, including IntermediateResult, IntermediateResultPartition, DefaultResultPartition, and etc. All the related sanity checks need to be removed, too. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (FLINK-24491) ExecutionGraphInfo may not be archived when the dispatcher terminates
Zhilong Hong created FLINK-24491: Summary: ExecutionGraphInfo may not be archived when the dispatcher terminates Key: FLINK-24491 URL: https://issues.apache.org/jira/browse/FLINK-24491 Project: Flink Issue Type: Bug Components: Runtime / Configuration Affects Versions: 1.13.2, 1.14.0, 1.15.0 Reporter: Zhilong Hong Fix For: 1.13.3, 1.15.0, 1.14.1 When a job finishes, its JobManagerRunnerResult will be processed in the callback of {{Dispatcher#runJob}}. In the callback, ExecutionGraphInfo will be archived by HistoryServerArchivist asynchronously. However, the CompletableFuture of the archiving is ignored. The job may be removed before the archiving is finished. For the batch job running in the per-job/application mode, the dispatcher will terminate itself once the job is finished. In this case, ExecutionGraphInfo may not be archived when the dispatcher terminates. If the ExecutionGraphInfo is lost, users are not able to know whether the batch job is finished normally or not. They have to refer to the logs for the result. The session mode is not affected, since the dispatcher won't terminate itself once the job is finished. The HistoryServerArchivist gets enough time to archive the ExcutionGraphInfo. -- This message was sent by Atlassian Jira (v8.3.4#803005)