That's great! Thanks. I'll keep an eye out on those PRs. Harini Software Engineer, Observability +1 412 708 3872
On Wed, Jan 5, 2022 at 11:00 AM Jason Koch <jk...@netflix.com> wrote: > Thanks for the prompt - yes I'll get these fixed. They are code coverage / > linter fixes, I had mistakenly assumed they were flake-y tests. I'll aim to > fix these today. > > On Wed, Jan 5, 2022 at 7:25 AM Harini Rajendran <hrajend...@confluent.io> > wrote: > >> Hi Jason, >> >> I was taking a look at your PRs and see that CI build is failing for 2 of >> them. Do you know why those are failing and are you planning to fix them? >> >> Harini >> Software Engineer, Observability >> +1 412 708 3872 >> >> >> >> On Mon, Jan 3, 2022 at 7:59 PM Harini Rajendran <hrajend...@confluent.io> >> wrote: >> >>> Hi Jason, >>> >>> I shall take a look at these 3 PRs and see if we can try these out in >>> our test environment. >>> >>> Also, we use AWS RDS as the metadata engine. >>> >>> Harini >>> Software Engineer, Observability >>> +1 412 708 3872 >>> >>> >>> >>> On Fri, Dec 31, 2021 at 3:22 PM Jason Koch <jk...@netflix.com> wrote: >>> >>>> Hi Harini, >>>> >>>> I had a chance to look at the checkpoint behaviour you mentioned in >>>> more detail, and found two codepaths where the RunNotice code ends up in >>>> the TaskQueue, and hits the same locks. I'd be interested if you want to >>>> try the three related PRs I have submitted. (I added more detail to the >>>> issue https://github.com/apache/druid/issues/11414). >>>> >>>> Failing that, I think the best next step would be some stack traces / >>>> and/or profiler output from the supervisor at the time of rollover. It >>>> would also be useful to know which metadata storage solution you are using >>>> as the RunNotice interacts with metadata engine too. >>>> >>>> Thanks >>>> Jason >>>> >>>> On Fri, Dec 3, 2021 at 2:43 PM Jason Koch <jk...@netflix.com> wrote: >>>> >>>>> Gian, >>>>> >>>>> I've submitted a PR to gianm/tq-scale-test that provides a concurrent >>>>> test, (and fixes a concurrency bug I found along the way). The change uses >>>>> an 8millis response time for shutdown acknowledgment, and a 2 second time >>>>> for shutdown completion/notification. >>>>> >>>>> Based on this test, >>>>> - serial TaskQueue timeout occurs after 60sec for each test, and, >>>>> - concurrent TaskQueue passes in ~10sec per test, >>>>> >>>>> https://github.com/gianm/druid/pull/3/files >>>>> >>>>> Let me know your thoughts. >>>>> >>>>> Thanks >>>>> Jason >>>>> >>>>> On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote: >>>>> >>>>>> Hi Gian >>>>>> >>>>>> > Jason, also interesting findings! I took a crack at rebasing your >>>>>> patch on >>>>>> > master and adding a scale test for the TaskQueue with 1000 >>>>>> simulated tasks: >>>>>> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test >>>>>> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>. >>>>>> When >>>>>> > I run the scale test, "doMassLaunchAndExit" passes quickly but >>>>>> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots >>>>>> of tasks >>>>>> > is still a bottleneck. >>>>>> >>>>>> Looks good, I'll come back to the test below. >>>>>> >>>>>> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>>>>> pretty >>>>>> > straightforward to make the shutdown API asynchronous, which would >>>>>> help >>>>>> > speed up anything that is shutting down lots of tasks all at once. >>>>>> Would >>>>>> > that be helpful in your environments? Or are the changes to move >>>>>> shutdown >>>>>> > out of critical sections going to be enough? >>>>>> >>>>>> This would be the logical next step for some environments, but I >>>>>> didn't need to go >>>>>> that far. For this particular cluster we are reading from Kafka, >>>>>> which is a >>>>>> SeekableStreamIndexRunner, the /stop call does not stop directly, it >>>>>> only sets a >>>>>> flag, so, the response back to overlord comes back in >>>>>> single-digit-milliseconds. >>>>>> >>>>>> Some extra detail on the specific interaction as related to Kafka >>>>>> might make >>>>>> the problem/fix more clear ... >>>>>> >>>>>> == >>>>>> >>>>>> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms >>>>>> to >>>>>> acknowledge a stop request. TaskQueue#L322-L332 is going to issue >>>>>> 500x2ms >>>>>> requests to stop all of them, which will take approx 1 second to >>>>>> complete. >>>>>> >>>>>> Notice it is doing this whilst holding the lock. >>>>>> >>>>>> After cleanup those tasks will issue a status update via ZK -> >>>>>> RemoteTaskRunner >>>>>> ::taskComplete that they have completed. That taskComplete fires the >>>>>> future >>>>>> completion which lands back in TaskQueue::notifyStatus where the >>>>>> TaskQueue >>>>>> can now update state that the task has finished. >>>>>> >>>>>> But - notifyStatus can *only* proceed once the lock has been >>>>>> released, and then >>>>>> it claims the lock, and calls removeTaskInternal. At this point the >>>>>> lock is released, >>>>>> and, maybe a few more concurrent ZK->notifyStatus() calls proceed. >>>>>> Let's >>>>>> suppose we got lucky, and we processed 10 requests, which have now >>>>>> been >>>>>> removed from the TaskQueue. >>>>>> >>>>>> At some point though, TaskQueue manage loop is going to get that >>>>>> lock, and we >>>>>> are now at 490 tasks that the queue believes are running, which we >>>>>> expected >>>>>> to be stopped, so we issue another 490*2ms=980ms of HTTP stop >>>>>> requests. >>>>>> And then, maybe, we get another 10 notifyStatus complete .... and we >>>>>> issue 480, >>>>>> and so on. >>>>>> >>>>>> Evidently, this is going to take a long time for the TaskQueue to >>>>>> quiesce to the >>>>>> correct state, and things are a little confused across the cluster >>>>>> until that >>>>>> happens. And, to top it off, tasks and supervisor get confused as to >>>>>> progress, >>>>>> so the task is marked failed, and put back in queue to restart, which >>>>>> means >>>>>> it takes longer. >>>>>> >>>>>> The fix is basically to make sure that the TaskQueue::notifyStatus >>>>>> can proceed >>>>>> to update the task state without blocking. Following that we get a >>>>>> flood of ZK >>>>>> updates in short order, so making the logging & ZK processing more >>>>>> efficient >>>>>> significantly reduces the time for quiesce to complete. >>>>>> >>>>>> == >>>>>> >>>>>> So back to the test, it looks good, and I think some tweaks need to >>>>>> happen to >>>>>> replicate the above: >>>>>> (1) the taskQueue.add() and shutdown() calls should be run >>>>>> concurrently from lots >>>>>> of threads >>>>>> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, >>>>>> and >>>>>> concurrently sleep some time, followed by a >>>>>> knownTasks.get(taskId).setResult(...), which I think will trigger the >>>>>> notification loop. >>>>>> >>>>>> I'll take a shot at this after lunch today. >>>>>> >>>>>> Sample logfile of a quick clean shutdown of a kafka task, from >>>>>> overlord view: >>>>>> >>>>>> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log >>>>>> >>>>>> Thanks >>>>>> Jason >>>>>> >>>>>> >>>>>> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote: >>>>>> >>>>>>> Harini, those are interesting findings. I'm not sure if the two >>>>>>> pauses are >>>>>>> necessary, but my thought is that it ideally shouldn't matter >>>>>>> because the >>>>>>> supervisor shouldn't be taking that long to handle its notices. A >>>>>>> couple >>>>>>> things come to mind about that: >>>>>>> >>>>>>> 1) Did you see what specifically the supervisor is doing when it's >>>>>>> handling >>>>>>> the notices? Maybe from a stack trace? We should look into >>>>>>> optimizing it, >>>>>>> or making it asynchronous or something, depending on what it is. >>>>>>> 2) Although, there isn't really a need to trigger a run for every >>>>>>> single >>>>>>> task status change anyway; I think it's ok to coalesce them. This >>>>>>> patch >>>>>>> would do it: https://github.com/apache/druid/pull/12018 >>>>>>> >>>>>>> Jason, also interesting findings! I took a crack at rebasing your >>>>>>> patch on >>>>>>> master and adding a scale test for the TaskQueue with 1000 simulated >>>>>>> tasks: >>>>>>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test. >>>>>>> When >>>>>>> I run the scale test, "doMassLaunchAndExit" passes quickly but >>>>>>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of >>>>>>> tasks >>>>>>> is still a bottleneck. >>>>>>> >>>>>>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>>>>>> pretty >>>>>>> straightforward to make the shutdown API asynchronous, which would >>>>>>> help >>>>>>> speed up anything that is shutting down lots of tasks all at once. >>>>>>> Would >>>>>>> that be helpful in your environments? Or are the changes to move >>>>>>> shutdown >>>>>>> out of critical sections going to be enough? >>>>>>> >>>>>>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid> >>>>>>> wrote: >>>>>>> >>>>>>> > Hi Harini, >>>>>>> > >>>>>>> > We have seen issues like this related to task roll time, related >>>>>>> to task >>>>>>> > queue notifications on overlord instances; I have a patch running >>>>>>> > internally that resolves this. >>>>>>> > >>>>>>> > These are my internal triage notes: >>>>>>> > ====== >>>>>>> > - Whenever task scheduling is happening (startup, ingest segment >>>>>>> task >>>>>>> > rollover, redeployment of datasource) Overlord takes a long time >>>>>>> to assign >>>>>>> > workers. This compounds because tasks sit so long before >>>>>>> deployment that it >>>>>>> > starts failing tasks and having to relaunch them. >>>>>>> > >>>>>>> > - TaskQueue: notifyStatus() which receives updates from the >>>>>>> > middlemanagers, and the manage() loop which controls services, >>>>>>> runs >>>>>>> > through >>>>>>> > a single lock. For example, the shutdown request involves >>>>>>> submitting >>>>>>> > downstream HTTP requests synchronously (while holding the lock). >>>>>>> > - This means for a cluster with ~700 tasks that tasks are held >>>>>>> for >>>>>>> > nearly 1second, and only after each 1 second around the manage >>>>>>> loop can >>>>>>> > 1-2 >>>>>>> > notifications be processed. For a new startup, with 700 tasks, >>>>>>> and a >>>>>>> > 1sec >>>>>>> > delay, that is 300-600-or-more seconds for the overlord to >>>>>>> realise all >>>>>>> > the >>>>>>> > tasks are started by the middle manager. >>>>>>> > - Similar delays happen for any other operations. >>>>>>> > - Sub-optimal logging code path (double-concatening very long >>>>>>> log >>>>>>> > entries), >>>>>>> > - ZkWorker: Worker fully deserializing all ZK payload data >>>>>>> every time >>>>>>> > looking up task IDs rather than only looking at the ID fields. >>>>>>> > Similarly, >>>>>>> > repeat fetching data on task assignment. >>>>>>> > >>>>>>> > ===== >>>>>>> > >>>>>>> > The patch I have is here: >>>>>>> > https://github.com/jasonk000/druid/pull/7/files >>>>>>> > >>>>>>> > It fixes a couple of things, most importantly the task queue >>>>>>> notification >>>>>>> > system. The system is much more stable with high task counts and >>>>>>> will >>>>>>> > easily restart many tasks concurrently. >>>>>>> > >>>>>>> > I have other perf issues I want to look at first before I can >>>>>>> document it >>>>>>> > fully, build a test case, rebase it on apache/master, etc. If you >>>>>>> test it >>>>>>> > out, and it works, we could submit a PR that would resolve it. >>>>>>> > >>>>>>> > PS - I have a queue of similar fixes I'd like to submit, but need >>>>>>> some time >>>>>>> > to do the documentation, build test cases, upstreaming, etc, if >>>>>>> anyone >>>>>>> > wants to collaborate, I could open some Issues and share my >>>>>>> partial notes. >>>>>>> > >>>>>>> > Thanks >>>>>>> > Jason >>>>>>> > >>>>>>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran >>>>>>> > <hrajend...@confluent.io.invalid> wrote: >>>>>>> > >>>>>>> > > Hi all, >>>>>>> > > >>>>>>> > > I have been investigating this in the background for a few days >>>>>>> now and >>>>>>> > > need some help from the community. >>>>>>> > > >>>>>>> > > We noticed that every hour, when the tasks roll, we see a spike >>>>>>> in the >>>>>>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running >>>>>>> on this >>>>>>> > > datasource. >>>>>>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png] >>>>>>> > > >>>>>>> > > On further debugging of task logs, we found out that around the >>>>>>> duration >>>>>>> > > when the ingestion lag spikes up, *the gap between pause and >>>>>>> resume >>>>>>> > > commands in the task logs during checkpointing are wide ranging >>>>>>> from few >>>>>>> > > seconds to couple minutes*. For example, in the following task >>>>>>> logs you >>>>>>> > > can see that it was about 1.5 minutes. >>>>>>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG", >>>>>>> > > "message":"Received pause command, *pausing* ingestion until >>>>>>> resumed.", " >>>>>>> > > service.name >>>>>>> > > >>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>>> > > process.thread.name >>>>>>> > > >>>>>>> > >>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG", >>>>>>> > > "message":"Received pause command, *pausing* ingestion until >>>>>>> resumed.", " >>>>>>> > > service.name >>>>>>> > > >>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>>> > > process.thread.name >>>>>>> > > >>>>>>> > >>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG", >>>>>>> > > "message":"Received resume command, *resuming* ingestion.", " >>>>>>> > service.name >>>>>>> > > >>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>>> > > process.thread.name >>>>>>> > > >>>>>>> > >>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>>> > > So this explains why ingestion is lagging as the *tasks are >>>>>>> paused for a >>>>>>> > > long time*. >>>>>>> > > >>>>>>> > > *Why are there 2 pauses during checkpointing and why such a huge >>>>>>> gap?* >>>>>>> > > As a next step, I wanted to see why there is such a wide gap. >>>>>>> Then we >>>>>>> > > realized that the first pause is when the task pauses itself here >>>>>>> > > < >>>>>>> > >>>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728 >>>>>>> > >>>>>>> > while >>>>>>> > > requesting the supervisor for a checkpoint. And the second pause >>>>>>> is when >>>>>>> > > the supervisor actually handles the checkpoint notice here >>>>>>> > > < >>>>>>> > >>>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548 >>>>>>> > > >>>>>>> > > . >>>>>>> > > And since the supervisor thread for this data source takes such >>>>>>> a long >>>>>>> > > time to process all the notices in the queue before this >>>>>>> checkpoint >>>>>>> > notice, >>>>>>> > > the ingestion task ends up being in the paused state for a long >>>>>>> time. >>>>>>> > > >>>>>>> > > *Why does the supervisor thread take such a long time to get to >>>>>>> this >>>>>>> > > checkpoint notice?* >>>>>>> > > That was my next step in debugging. >>>>>>> > > Proving our theory, we noticed that the *noticesQueue in the >>>>>>> supervisor >>>>>>> > > does get backed up with 100s of notices every hour when tasks >>>>>>> roll*. >>>>>>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png] >>>>>>> > > And we saw that *run_notice takes between 5s and 7s during task >>>>>>> rolls*. >>>>>>> > > And this causes backing up of noticesQueue causing checkpoint >>>>>>> notice to >>>>>>> > be >>>>>>> > > in the queue for long leading to ingestion lag spike whenever >>>>>>> tasks roll. >>>>>>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png] >>>>>>> > > >>>>>>> > > *Why does run_notice take 5-7s to finish?* >>>>>>> > > When the task starts, it takes about 5s for the HTTP server to >>>>>>> come up. >>>>>>> > > So, till then the supervisor thread is in a loop trying to get >>>>>>> the task >>>>>>> > > status and this causes run_notice to take about 5-7s to finish. >>>>>>> > > >>>>>>> > > *Questions to the community* >>>>>>> > > Do we need 2 pauses during checkpointing? Should the task pause >>>>>>> itself >>>>>>> > > before requesting a checkpoint notice given that the supervisor >>>>>>> is >>>>>>> > anyways >>>>>>> > > going to pause the task while handling the notice? Or is it okay >>>>>>> to >>>>>>> > remove >>>>>>> > > the pause in the TaskRunner before it sends a checkpoint notice >>>>>>> request >>>>>>> > to >>>>>>> > > the supervisor? This would immediately solve the ingestion lag >>>>>>> issue >>>>>>> > > completely as there won't be two pauses with such a huge gap in >>>>>>> between. >>>>>>> > > >>>>>>> > > Harini >>>>>>> > > Software Engineer, Confluent >>>>>>> > > >>>>>>> > >>>>>>> >>>>>>