Hi Jason, I shall take a look at these 3 PRs and see if we can try these out in our test environment.
Also, we use AWS RDS as the metadata engine. Harini Software Engineer, Observability +1 412 708 3872 On Fri, Dec 31, 2021 at 3:22 PM Jason Koch <jk...@netflix.com> wrote: > Hi Harini, > > I had a chance to look at the checkpoint behaviour you mentioned in more > detail, and found two codepaths where the RunNotice code ends up in the > TaskQueue, and hits the same locks. I'd be interested if you want to try > the three related PRs I have submitted. (I added more detail to the issue > https://github.com/apache/druid/issues/11414). > > Failing that, I think the best next step would be some stack traces / > and/or profiler output from the supervisor at the time of rollover. It > would also be useful to know which metadata storage solution you are using > as the RunNotice interacts with metadata engine too. > > Thanks > Jason > > On Fri, Dec 3, 2021 at 2:43 PM Jason Koch <jk...@netflix.com> wrote: > >> Gian, >> >> I've submitted a PR to gianm/tq-scale-test that provides a concurrent >> test, (and fixes a concurrency bug I found along the way). The change uses >> an 8millis response time for shutdown acknowledgment, and a 2 second time >> for shutdown completion/notification. >> >> Based on this test, >> - serial TaskQueue timeout occurs after 60sec for each test, and, >> - concurrent TaskQueue passes in ~10sec per test, >> >> https://github.com/gianm/druid/pull/3/files >> >> Let me know your thoughts. >> >> Thanks >> Jason >> >> On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote: >> >>> Hi Gian >>> >>> > Jason, also interesting findings! I took a crack at rebasing your >>> patch on >>> > master and adding a scale test for the TaskQueue with 1000 simulated >>> tasks: >>> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test >>> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>. >>> When >>> > I run the scale test, "doMassLaunchAndExit" passes quickly but >>> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots of >>> tasks >>> > is still a bottleneck. >>> >>> Looks good, I'll come back to the test below. >>> >>> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>> pretty >>> > straightforward to make the shutdown API asynchronous, which would help >>> > speed up anything that is shutting down lots of tasks all at once. >>> Would >>> > that be helpful in your environments? Or are the changes to move >>> shutdown >>> > out of critical sections going to be enough? >>> >>> This would be the logical next step for some environments, but I didn't >>> need to go >>> that far. For this particular cluster we are reading from Kafka, which >>> is a >>> SeekableStreamIndexRunner, the /stop call does not stop directly, it >>> only sets a >>> flag, so, the response back to overlord comes back in >>> single-digit-milliseconds. >>> >>> Some extra detail on the specific interaction as related to Kafka might >>> make >>> the problem/fix more clear ... >>> >>> == >>> >>> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms to >>> acknowledge a stop request. TaskQueue#L322-L332 is going to issue 500x2ms >>> requests to stop all of them, which will take approx 1 second to >>> complete. >>> >>> Notice it is doing this whilst holding the lock. >>> >>> After cleanup those tasks will issue a status update via ZK -> >>> RemoteTaskRunner >>> ::taskComplete that they have completed. That taskComplete fires the >>> future >>> completion which lands back in TaskQueue::notifyStatus where the >>> TaskQueue >>> can now update state that the task has finished. >>> >>> But - notifyStatus can *only* proceed once the lock has been released, >>> and then >>> it claims the lock, and calls removeTaskInternal. At this point the lock >>> is released, >>> and, maybe a few more concurrent ZK->notifyStatus() calls proceed. Let's >>> suppose we got lucky, and we processed 10 requests, which have now been >>> removed from the TaskQueue. >>> >>> At some point though, TaskQueue manage loop is going to get that lock, >>> and we >>> are now at 490 tasks that the queue believes are running, which we >>> expected >>> to be stopped, so we issue another 490*2ms=980ms of HTTP stop requests. >>> And then, maybe, we get another 10 notifyStatus complete .... and we >>> issue 480, >>> and so on. >>> >>> Evidently, this is going to take a long time for the TaskQueue to >>> quiesce to the >>> correct state, and things are a little confused across the cluster until >>> that >>> happens. And, to top it off, tasks and supervisor get confused as to >>> progress, >>> so the task is marked failed, and put back in queue to restart, which >>> means >>> it takes longer. >>> >>> The fix is basically to make sure that the TaskQueue::notifyStatus can >>> proceed >>> to update the task state without blocking. Following that we get a flood >>> of ZK >>> updates in short order, so making the logging & ZK processing more >>> efficient >>> significantly reduces the time for quiesce to complete. >>> >>> == >>> >>> So back to the test, it looks good, and I think some tweaks need to >>> happen to >>> replicate the above: >>> (1) the taskQueue.add() and shutdown() calls should be run concurrently >>> from lots >>> of threads >>> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, and >>> concurrently sleep some time, followed by a >>> knownTasks.get(taskId).setResult(...), which I think will trigger the >>> notification loop. >>> >>> I'll take a shot at this after lunch today. >>> >>> Sample logfile of a quick clean shutdown of a kafka task, from overlord >>> view: >>> >>> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log >>> >>> Thanks >>> Jason >>> >>> >>> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote: >>> >>>> Harini, those are interesting findings. I'm not sure if the two pauses >>>> are >>>> necessary, but my thought is that it ideally shouldn't matter because >>>> the >>>> supervisor shouldn't be taking that long to handle its notices. A couple >>>> things come to mind about that: >>>> >>>> 1) Did you see what specifically the supervisor is doing when it's >>>> handling >>>> the notices? Maybe from a stack trace? We should look into optimizing >>>> it, >>>> or making it asynchronous or something, depending on what it is. >>>> 2) Although, there isn't really a need to trigger a run for every single >>>> task status change anyway; I think it's ok to coalesce them. This patch >>>> would do it: https://github.com/apache/druid/pull/12018 >>>> >>>> Jason, also interesting findings! I took a crack at rebasing your patch >>>> on >>>> master and adding a scale test for the TaskQueue with 1000 simulated >>>> tasks: >>>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test. >>>> When >>>> I run the scale test, "doMassLaunchAndExit" passes quickly but >>>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of >>>> tasks >>>> is still a bottleneck. >>>> >>>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>>> pretty >>>> straightforward to make the shutdown API asynchronous, which would help >>>> speed up anything that is shutting down lots of tasks all at once. Would >>>> that be helpful in your environments? Or are the changes to move >>>> shutdown >>>> out of critical sections going to be enough? >>>> >>>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid> >>>> wrote: >>>> >>>> > Hi Harini, >>>> > >>>> > We have seen issues like this related to task roll time, related to >>>> task >>>> > queue notifications on overlord instances; I have a patch running >>>> > internally that resolves this. >>>> > >>>> > These are my internal triage notes: >>>> > ====== >>>> > - Whenever task scheduling is happening (startup, ingest segment task >>>> > rollover, redeployment of datasource) Overlord takes a long time to >>>> assign >>>> > workers. This compounds because tasks sit so long before deployment >>>> that it >>>> > starts failing tasks and having to relaunch them. >>>> > >>>> > - TaskQueue: notifyStatus() which receives updates from the >>>> > middlemanagers, and the manage() loop which controls services, runs >>>> > through >>>> > a single lock. For example, the shutdown request involves >>>> submitting >>>> > downstream HTTP requests synchronously (while holding the lock). >>>> > - This means for a cluster with ~700 tasks that tasks are held for >>>> > nearly 1second, and only after each 1 second around the manage >>>> loop can >>>> > 1-2 >>>> > notifications be processed. For a new startup, with 700 tasks, and >>>> a >>>> > 1sec >>>> > delay, that is 300-600-or-more seconds for the overlord to realise >>>> all >>>> > the >>>> > tasks are started by the middle manager. >>>> > - Similar delays happen for any other operations. >>>> > - Sub-optimal logging code path (double-concatening very long log >>>> > entries), >>>> > - ZkWorker: Worker fully deserializing all ZK payload data every >>>> time >>>> > looking up task IDs rather than only looking at the ID fields. >>>> > Similarly, >>>> > repeat fetching data on task assignment. >>>> > >>>> > ===== >>>> > >>>> > The patch I have is here: >>>> > https://github.com/jasonk000/druid/pull/7/files >>>> > >>>> > It fixes a couple of things, most importantly the task queue >>>> notification >>>> > system. The system is much more stable with high task counts and will >>>> > easily restart many tasks concurrently. >>>> > >>>> > I have other perf issues I want to look at first before I can >>>> document it >>>> > fully, build a test case, rebase it on apache/master, etc. If you >>>> test it >>>> > out, and it works, we could submit a PR that would resolve it. >>>> > >>>> > PS - I have a queue of similar fixes I'd like to submit, but need >>>> some time >>>> > to do the documentation, build test cases, upstreaming, etc, if anyone >>>> > wants to collaborate, I could open some Issues and share my partial >>>> notes. >>>> > >>>> > Thanks >>>> > Jason >>>> > >>>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran >>>> > <hrajend...@confluent.io.invalid> wrote: >>>> > >>>> > > Hi all, >>>> > > >>>> > > I have been investigating this in the background for a few days now >>>> and >>>> > > need some help from the community. >>>> > > >>>> > > We noticed that every hour, when the tasks roll, we see a spike in >>>> the >>>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running on >>>> this >>>> > > datasource. >>>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png] >>>> > > >>>> > > On further debugging of task logs, we found out that around the >>>> duration >>>> > > when the ingestion lag spikes up, *the gap between pause and resume >>>> > > commands in the task logs during checkpointing are wide ranging >>>> from few >>>> > > seconds to couple minutes*. For example, in the following task logs >>>> you >>>> > > can see that it was about 1.5 minutes. >>>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG", >>>> > > "message":"Received pause command, *pausing* ingestion until >>>> resumed.", " >>>> > > service.name >>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>> > > process.thread.name >>>> > > >>>> > >>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG", >>>> > > "message":"Received pause command, *pausing* ingestion until >>>> resumed.", " >>>> > > service.name >>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>> > > process.thread.name >>>> > > >>>> > >>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG", >>>> > > "message":"Received resume command, *resuming* ingestion.", " >>>> > service.name >>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>> > > process.thread.name >>>> > > >>>> > >>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>> > > So this explains why ingestion is lagging as the *tasks are paused >>>> for a >>>> > > long time*. >>>> > > >>>> > > *Why are there 2 pauses during checkpointing and why such a huge >>>> gap?* >>>> > > As a next step, I wanted to see why there is such a wide gap. Then >>>> we >>>> > > realized that the first pause is when the task pauses itself here >>>> > > < >>>> > >>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728 >>>> > >>>> > while >>>> > > requesting the supervisor for a checkpoint. And the second pause is >>>> when >>>> > > the supervisor actually handles the checkpoint notice here >>>> > > < >>>> > >>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548 >>>> > > >>>> > > . >>>> > > And since the supervisor thread for this data source takes such a >>>> long >>>> > > time to process all the notices in the queue before this checkpoint >>>> > notice, >>>> > > the ingestion task ends up being in the paused state for a long >>>> time. >>>> > > >>>> > > *Why does the supervisor thread take such a long time to get to this >>>> > > checkpoint notice?* >>>> > > That was my next step in debugging. >>>> > > Proving our theory, we noticed that the *noticesQueue in the >>>> supervisor >>>> > > does get backed up with 100s of notices every hour when tasks roll*. >>>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png] >>>> > > And we saw that *run_notice takes between 5s and 7s during task >>>> rolls*. >>>> > > And this causes backing up of noticesQueue causing checkpoint >>>> notice to >>>> > be >>>> > > in the queue for long leading to ingestion lag spike whenever tasks >>>> roll. >>>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png] >>>> > > >>>> > > *Why does run_notice take 5-7s to finish?* >>>> > > When the task starts, it takes about 5s for the HTTP server to come >>>> up. >>>> > > So, till then the supervisor thread is in a loop trying to get the >>>> task >>>> > > status and this causes run_notice to take about 5-7s to finish. >>>> > > >>>> > > *Questions to the community* >>>> > > Do we need 2 pauses during checkpointing? Should the task pause >>>> itself >>>> > > before requesting a checkpoint notice given that the supervisor is >>>> > anyways >>>> > > going to pause the task while handling the notice? Or is it okay to >>>> > remove >>>> > > the pause in the TaskRunner before it sends a checkpoint notice >>>> request >>>> > to >>>> > > the supervisor? This would immediately solve the ingestion lag issue >>>> > > completely as there won't be two pauses with such a huge gap in >>>> between. >>>> > > >>>> > > Harini >>>> > > Software Engineer, Confluent >>>> > > >>>> > >>>> >>>