Gian, I've submitted a PR to gianm/tq-scale-test that provides a concurrent test, (and fixes a concurrency bug I found along the way). The change uses an 8millis response time for shutdown acknowledgment, and a 2 second time for shutdown completion/notification.
Based on this test, - serial TaskQueue timeout occurs after 60sec for each test, and, - concurrent TaskQueue passes in ~10sec per test, https://github.com/gianm/druid/pull/3/files Let me know your thoughts. Thanks Jason On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote: > Hi Gian > > > Jason, also interesting findings! I took a crack at rebasing your patch > on > > master and adding a scale test for the TaskQueue with 1000 simulated > tasks: > > https://github.com/apache/druid/compare/master...gianm:tq-scale-test > <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>. > When > > I run the scale test, "doMassLaunchAndExit" passes quickly but > > "doMassLaunchAndShutdown" times out. I suppose shutting down lots of > tasks > > is still a bottleneck. > > Looks good, I'll come back to the test below. > > > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be pretty > > straightforward to make the shutdown API asynchronous, which would help > > speed up anything that is shutting down lots of tasks all at once. Would > > that be helpful in your environments? Or are the changes to move shutdown > > out of critical sections going to be enough? > > This would be the logical next step for some environments, but I didn't > need to go > that far. For this particular cluster we are reading from Kafka, which is a > SeekableStreamIndexRunner, the /stop call does not stop directly, it only > sets a > flag, so, the response back to overlord comes back in > single-digit-milliseconds. > > Some extra detail on the specific interaction as related to Kafka might > make > the problem/fix more clear ... > > == > > Suppose for simplicity we have 500 tasks to roll, and each takes 2ms to > acknowledge a stop request. TaskQueue#L322-L332 is going to issue 500x2ms > requests to stop all of them, which will take approx 1 second to complete. > > Notice it is doing this whilst holding the lock. > > After cleanup those tasks will issue a status update via ZK -> > RemoteTaskRunner > ::taskComplete that they have completed. That taskComplete fires the future > completion which lands back in TaskQueue::notifyStatus where the TaskQueue > can now update state that the task has finished. > > But - notifyStatus can *only* proceed once the lock has been released, and > then > it claims the lock, and calls removeTaskInternal. At this point the lock > is released, > and, maybe a few more concurrent ZK->notifyStatus() calls proceed. Let's > suppose we got lucky, and we processed 10 requests, which have now been > removed from the TaskQueue. > > At some point though, TaskQueue manage loop is going to get that lock, and > we > are now at 490 tasks that the queue believes are running, which we expected > to be stopped, so we issue another 490*2ms=980ms of HTTP stop requests. > And then, maybe, we get another 10 notifyStatus complete .... and we issue > 480, > and so on. > > Evidently, this is going to take a long time for the TaskQueue to quiesce > to the > correct state, and things are a little confused across the cluster until > that > happens. And, to top it off, tasks and supervisor get confused as to > progress, > so the task is marked failed, and put back in queue to restart, which means > it takes longer. > > The fix is basically to make sure that the TaskQueue::notifyStatus can > proceed > to update the task state without blocking. Following that we get a flood > of ZK > updates in short order, so making the logging & ZK processing more > efficient > significantly reduces the time for quiesce to complete. > > == > > So back to the test, it looks good, and I think some tweaks need to happen > to > replicate the above: > (1) the taskQueue.add() and shutdown() calls should be run concurrently > from lots > of threads > (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, and > concurrently sleep some time, followed by a > knownTasks.get(taskId).setResult(...), which I think will trigger the > notification loop. > > I'll take a shot at this after lunch today. > > Sample logfile of a quick clean shutdown of a kafka task, from overlord > view: > > https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log > > Thanks > Jason > > > On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote: > >> Harini, those are interesting findings. I'm not sure if the two pauses are >> necessary, but my thought is that it ideally shouldn't matter because the >> supervisor shouldn't be taking that long to handle its notices. A couple >> things come to mind about that: >> >> 1) Did you see what specifically the supervisor is doing when it's >> handling >> the notices? Maybe from a stack trace? We should look into optimizing it, >> or making it asynchronous or something, depending on what it is. >> 2) Although, there isn't really a need to trigger a run for every single >> task status change anyway; I think it's ok to coalesce them. This patch >> would do it: https://github.com/apache/druid/pull/12018 >> >> Jason, also interesting findings! I took a crack at rebasing your patch on >> master and adding a scale test for the TaskQueue with 1000 simulated >> tasks: >> https://github.com/apache/druid/compare/master...gianm:tq-scale-test. >> When >> I run the scale test, "doMassLaunchAndExit" passes quickly but >> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of tasks >> is still a bottleneck. >> >> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be pretty >> straightforward to make the shutdown API asynchronous, which would help >> speed up anything that is shutting down lots of tasks all at once. Would >> that be helpful in your environments? Or are the changes to move shutdown >> out of critical sections going to be enough? >> >> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid> >> wrote: >> >> > Hi Harini, >> > >> > We have seen issues like this related to task roll time, related to task >> > queue notifications on overlord instances; I have a patch running >> > internally that resolves this. >> > >> > These are my internal triage notes: >> > ====== >> > - Whenever task scheduling is happening (startup, ingest segment task >> > rollover, redeployment of datasource) Overlord takes a long time to >> assign >> > workers. This compounds because tasks sit so long before deployment >> that it >> > starts failing tasks and having to relaunch them. >> > >> > - TaskQueue: notifyStatus() which receives updates from the >> > middlemanagers, and the manage() loop which controls services, runs >> > through >> > a single lock. For example, the shutdown request involves submitting >> > downstream HTTP requests synchronously (while holding the lock). >> > - This means for a cluster with ~700 tasks that tasks are held for >> > nearly 1second, and only after each 1 second around the manage loop >> can >> > 1-2 >> > notifications be processed. For a new startup, with 700 tasks, and a >> > 1sec >> > delay, that is 300-600-or-more seconds for the overlord to realise >> all >> > the >> > tasks are started by the middle manager. >> > - Similar delays happen for any other operations. >> > - Sub-optimal logging code path (double-concatening very long log >> > entries), >> > - ZkWorker: Worker fully deserializing all ZK payload data every time >> > looking up task IDs rather than only looking at the ID fields. >> > Similarly, >> > repeat fetching data on task assignment. >> > >> > ===== >> > >> > The patch I have is here: >> > https://github.com/jasonk000/druid/pull/7/files >> > >> > It fixes a couple of things, most importantly the task queue >> notification >> > system. The system is much more stable with high task counts and will >> > easily restart many tasks concurrently. >> > >> > I have other perf issues I want to look at first before I can document >> it >> > fully, build a test case, rebase it on apache/master, etc. If you test >> it >> > out, and it works, we could submit a PR that would resolve it. >> > >> > PS - I have a queue of similar fixes I'd like to submit, but need some >> time >> > to do the documentation, build test cases, upstreaming, etc, if anyone >> > wants to collaborate, I could open some Issues and share my partial >> notes. >> > >> > Thanks >> > Jason >> > >> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran >> > <hrajend...@confluent.io.invalid> wrote: >> > >> > > Hi all, >> > > >> > > I have been investigating this in the background for a few days now >> and >> > > need some help from the community. >> > > >> > > We noticed that every hour, when the tasks roll, we see a spike in the >> > > ingestion lag for about 2-4 minutes. We have 180 tasks running on this >> > > datasource. >> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png] >> > > >> > > On further debugging of task logs, we found out that around the >> duration >> > > when the ingestion lag spikes up, *the gap between pause and resume >> > > commands in the task logs during checkpointing are wide ranging from >> few >> > > seconds to couple minutes*. For example, in the following task logs >> you >> > > can see that it was about 1.5 minutes. >> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG", >> > > "message":"Received pause command, *pausing* ingestion until >> resumed.", " >> > > service.name >> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >> > > process.thread.name >> > > >> > >> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG", >> > > "message":"Received pause command, *pausing* ingestion until >> resumed.", " >> > > service.name >> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >> > > process.thread.name >> > > >> > >> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG", >> > > "message":"Received resume command, *resuming* ingestion.", " >> > service.name >> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >> > > process.thread.name >> > > >> > >> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >> > > So this explains why ingestion is lagging as the *tasks are paused >> for a >> > > long time*. >> > > >> > > *Why are there 2 pauses during checkpointing and why such a huge gap?* >> > > As a next step, I wanted to see why there is such a wide gap. Then we >> > > realized that the first pause is when the task pauses itself here >> > > < >> > >> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728 >> > >> > while >> > > requesting the supervisor for a checkpoint. And the second pause is >> when >> > > the supervisor actually handles the checkpoint notice here >> > > < >> > >> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548 >> > > >> > > . >> > > And since the supervisor thread for this data source takes such a long >> > > time to process all the notices in the queue before this checkpoint >> > notice, >> > > the ingestion task ends up being in the paused state for a long time. >> > > >> > > *Why does the supervisor thread take such a long time to get to this >> > > checkpoint notice?* >> > > That was my next step in debugging. >> > > Proving our theory, we noticed that the *noticesQueue in the >> supervisor >> > > does get backed up with 100s of notices every hour when tasks roll*. >> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png] >> > > And we saw that *run_notice takes between 5s and 7s during task >> rolls*. >> > > And this causes backing up of noticesQueue causing checkpoint notice >> to >> > be >> > > in the queue for long leading to ingestion lag spike whenever tasks >> roll. >> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png] >> > > >> > > *Why does run_notice take 5-7s to finish?* >> > > When the task starts, it takes about 5s for the HTTP server to come >> up. >> > > So, till then the supervisor thread is in a loop trying to get the >> task >> > > status and this causes run_notice to take about 5-7s to finish. >> > > >> > > *Questions to the community* >> > > Do we need 2 pauses during checkpointing? Should the task pause itself >> > > before requesting a checkpoint notice given that the supervisor is >> > anyways >> > > going to pause the task while handling the notice? Or is it okay to >> > remove >> > > the pause in the TaskRunner before it sends a checkpoint notice >> request >> > to >> > > the supervisor? This would immediately solve the ingestion lag issue >> > > completely as there won't be two pauses with such a huge gap in >> between. >> > > >> > > Harini >> > > Software Engineer, Confluent >> > > >> > >> >