I think part of the problem is that we currently use the executor of the common RpcService to run the I/O operations as Stephan suspected [1]. I will be fixing this problem for 1.8.0 and 1.7.3.
This should resolve the problem but supporting different means of clean up might still be interesting to add. [1] https://issues.apache.org/jira/browse/FLINK-11851 Cheers, Till On Thu, Mar 7, 2019 at 8:56 AM Yun Tang <myas...@live.com> wrote: > Sharing the communication pressure of a single node to multi task managers > would be a good idea. From my point of view, let task managers to know the > information that some specific checkpoint had already been aborted could > benefit a lot of things: > > * Let task manager to clean up the files, which is the topic of this > thread. > * Let `StreamTask` could cancel aborted running checkpoint in > task-side, just as https://issues.apache.org/jira/browse/FLINK-8871 want > to achieve. > * Let local state store could prune local checkpoints as soon as > possible without waiting for next `notifyCheckpointComplete` come. > * Let state backend on task manager side could did something on its > side, which would be really helpful for specific state backend > disaggregating computation and storage. > > Best > Yun Tang > ________________________________ > From: Thomas Weise <t...@apache.org> > Sent: Thursday, March 7, 2019 12:06 > To: dev@flink.apache.org > Subject: Re: JobManager scale limitation - Slow S3 checkpoint deletes > > Nice! > > Perhaps for file systems without TTL/expiration support (AFAIK includes > HDFS), cleanup could be performed in the task managers? > > > On Wed, Mar 6, 2019 at 6:01 PM Jamie Grier <jgr...@lyft.com.invalid> > wrote: > > > Yup, it looks like the actor threads are spending all of their time > > communicating with S3. I've attached a picture of a typical stack trace > > for one of the actor threads [1]. At the end of that call stack what > > you'll see is the thread blocking on synchronous communication with the > S3 > > service. This is for one of the flink-akka.actor.default-dispatcher > > threads. > > > > I've also attached a link to a YourKit snapshot if you'd like to explore > > the profiling data in more detail [2] > > > > [1] > > > > > https://drive.google.com/open?id=0BzMcf4IvnGWNNjEzMmRJbkFiWkZpZDFtQWo4LXByUXpPSFpR > > [2] https://drive.google.com/open?id=1iHCKJT-PTQUcDzFuIiacJ1MgAkfxza3W > > > > > > > > On Wed, Mar 6, 2019 at 7:41 AM Stephan Ewen <se...@apache.org> wrote: > > > > > I think having an option to not actively delete checkpoints (but rather > > > have the TTL feature of the file system take care of it) sounds like a > > good > > > idea. > > > > > > I am curious why you get heartbeat misses and akka timeouts during > > deletes. > > > Are some parts of the deletes happening sychronously in the actor > thread? > > > > > > On Wed, Mar 6, 2019 at 3:40 PM Jamie Grier <jgr...@lyft.com.invalid> > > > wrote: > > > > > > > We've run into an issue that limits the max parallelism of jobs we > can > > > run > > > > and what it seems to boil down to is that the JobManager becomes > > > > unresponsive while essentially spending all of it's time discarding > > > > checkpoints from S3. This results in sluggish UI, sporadic > > > > AkkaAskTimeouts, heartbeat misses, etc. > > > > > > > > Since S3 (and I assume HDFS) have policy that can be used to discard > > old > > > > objects without Flink actively deleting them I think it would be a > > useful > > > > feature to add the option to Flink to not ever discard checkpoints. > I > > > > believe this will solve the problem. > > > > > > > > Any objections or other known solutions to this problem? > > > > > > > > -Jamie > > > > > > > > > >