Thanks for the prompt - yes I'll get these fixed. They are code coverage / linter fixes, I had mistakenly assumed they were flake-y tests. I'll aim to fix these today.
On Wed, Jan 5, 2022 at 7:25 AM Harini Rajendran <hrajend...@confluent.io> wrote: > Hi Jason, > > I was taking a look at your PRs and see that CI build is failing for 2 of > them. Do you know why those are failing and are you planning to fix them? > > Harini > Software Engineer, Observability > +1 412 708 3872 > > > > On Mon, Jan 3, 2022 at 7:59 PM Harini Rajendran <hrajend...@confluent.io> > wrote: > >> Hi Jason, >> >> I shall take a look at these 3 PRs and see if we can try these out in our >> test environment. >> >> Also, we use AWS RDS as the metadata engine. >> >> Harini >> Software Engineer, Observability >> +1 412 708 3872 >> >> >> >> On Fri, Dec 31, 2021 at 3:22 PM Jason Koch <jk...@netflix.com> wrote: >> >>> Hi Harini, >>> >>> I had a chance to look at the checkpoint behaviour you mentioned in more >>> detail, and found two codepaths where the RunNotice code ends up in the >>> TaskQueue, and hits the same locks. I'd be interested if you want to try >>> the three related PRs I have submitted. (I added more detail to the issue >>> https://github.com/apache/druid/issues/11414). >>> >>> Failing that, I think the best next step would be some stack traces / >>> and/or profiler output from the supervisor at the time of rollover. It >>> would also be useful to know which metadata storage solution you are using >>> as the RunNotice interacts with metadata engine too. >>> >>> Thanks >>> Jason >>> >>> On Fri, Dec 3, 2021 at 2:43 PM Jason Koch <jk...@netflix.com> wrote: >>> >>>> Gian, >>>> >>>> I've submitted a PR to gianm/tq-scale-test that provides a concurrent >>>> test, (and fixes a concurrency bug I found along the way). The change uses >>>> an 8millis response time for shutdown acknowledgment, and a 2 second time >>>> for shutdown completion/notification. >>>> >>>> Based on this test, >>>> - serial TaskQueue timeout occurs after 60sec for each test, and, >>>> - concurrent TaskQueue passes in ~10sec per test, >>>> >>>> https://github.com/gianm/druid/pull/3/files >>>> >>>> Let me know your thoughts. >>>> >>>> Thanks >>>> Jason >>>> >>>> On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote: >>>> >>>>> Hi Gian >>>>> >>>>> > Jason, also interesting findings! I took a crack at rebasing your >>>>> patch on >>>>> > master and adding a scale test for the TaskQueue with 1000 simulated >>>>> tasks: >>>>> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test >>>>> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>. >>>>> When >>>>> > I run the scale test, "doMassLaunchAndExit" passes quickly but >>>>> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots of >>>>> tasks >>>>> > is still a bottleneck. >>>>> >>>>> Looks good, I'll come back to the test below. >>>>> >>>>> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>>>> pretty >>>>> > straightforward to make the shutdown API asynchronous, which would >>>>> help >>>>> > speed up anything that is shutting down lots of tasks all at once. >>>>> Would >>>>> > that be helpful in your environments? Or are the changes to move >>>>> shutdown >>>>> > out of critical sections going to be enough? >>>>> >>>>> This would be the logical next step for some environments, but I >>>>> didn't need to go >>>>> that far. For this particular cluster we are reading from Kafka, which >>>>> is a >>>>> SeekableStreamIndexRunner, the /stop call does not stop directly, it >>>>> only sets a >>>>> flag, so, the response back to overlord comes back in >>>>> single-digit-milliseconds. >>>>> >>>>> Some extra detail on the specific interaction as related to Kafka >>>>> might make >>>>> the problem/fix more clear ... >>>>> >>>>> == >>>>> >>>>> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms to >>>>> acknowledge a stop request. TaskQueue#L322-L332 is going to issue >>>>> 500x2ms >>>>> requests to stop all of them, which will take approx 1 second to >>>>> complete. >>>>> >>>>> Notice it is doing this whilst holding the lock. >>>>> >>>>> After cleanup those tasks will issue a status update via ZK -> >>>>> RemoteTaskRunner >>>>> ::taskComplete that they have completed. That taskComplete fires the >>>>> future >>>>> completion which lands back in TaskQueue::notifyStatus where the >>>>> TaskQueue >>>>> can now update state that the task has finished. >>>>> >>>>> But - notifyStatus can *only* proceed once the lock has been released, >>>>> and then >>>>> it claims the lock, and calls removeTaskInternal. At this point the >>>>> lock is released, >>>>> and, maybe a few more concurrent ZK->notifyStatus() calls proceed. >>>>> Let's >>>>> suppose we got lucky, and we processed 10 requests, which have now been >>>>> removed from the TaskQueue. >>>>> >>>>> At some point though, TaskQueue manage loop is going to get that lock, >>>>> and we >>>>> are now at 490 tasks that the queue believes are running, which we >>>>> expected >>>>> to be stopped, so we issue another 490*2ms=980ms of HTTP stop requests. >>>>> And then, maybe, we get another 10 notifyStatus complete .... and we >>>>> issue 480, >>>>> and so on. >>>>> >>>>> Evidently, this is going to take a long time for the TaskQueue to >>>>> quiesce to the >>>>> correct state, and things are a little confused across the cluster >>>>> until that >>>>> happens. And, to top it off, tasks and supervisor get confused as to >>>>> progress, >>>>> so the task is marked failed, and put back in queue to restart, which >>>>> means >>>>> it takes longer. >>>>> >>>>> The fix is basically to make sure that the TaskQueue::notifyStatus can >>>>> proceed >>>>> to update the task state without blocking. Following that we get a >>>>> flood of ZK >>>>> updates in short order, so making the logging & ZK processing more >>>>> efficient >>>>> significantly reduces the time for quiesce to complete. >>>>> >>>>> == >>>>> >>>>> So back to the test, it looks good, and I think some tweaks need to >>>>> happen to >>>>> replicate the above: >>>>> (1) the taskQueue.add() and shutdown() calls should be run >>>>> concurrently from lots >>>>> of threads >>>>> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, >>>>> and >>>>> concurrently sleep some time, followed by a >>>>> knownTasks.get(taskId).setResult(...), which I think will trigger the >>>>> notification loop. >>>>> >>>>> I'll take a shot at this after lunch today. >>>>> >>>>> Sample logfile of a quick clean shutdown of a kafka task, from >>>>> overlord view: >>>>> >>>>> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log >>>>> >>>>> Thanks >>>>> Jason >>>>> >>>>> >>>>> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote: >>>>> >>>>>> Harini, those are interesting findings. I'm not sure if the two >>>>>> pauses are >>>>>> necessary, but my thought is that it ideally shouldn't matter because >>>>>> the >>>>>> supervisor shouldn't be taking that long to handle its notices. A >>>>>> couple >>>>>> things come to mind about that: >>>>>> >>>>>> 1) Did you see what specifically the supervisor is doing when it's >>>>>> handling >>>>>> the notices? Maybe from a stack trace? We should look into optimizing >>>>>> it, >>>>>> or making it asynchronous or something, depending on what it is. >>>>>> 2) Although, there isn't really a need to trigger a run for every >>>>>> single >>>>>> task status change anyway; I think it's ok to coalesce them. This >>>>>> patch >>>>>> would do it: https://github.com/apache/druid/pull/12018 >>>>>> >>>>>> Jason, also interesting findings! I took a crack at rebasing your >>>>>> patch on >>>>>> master and adding a scale test for the TaskQueue with 1000 simulated >>>>>> tasks: >>>>>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test. >>>>>> When >>>>>> I run the scale test, "doMassLaunchAndExit" passes quickly but >>>>>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of >>>>>> tasks >>>>>> is still a bottleneck. >>>>>> >>>>>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be >>>>>> pretty >>>>>> straightforward to make the shutdown API asynchronous, which would >>>>>> help >>>>>> speed up anything that is shutting down lots of tasks all at once. >>>>>> Would >>>>>> that be helpful in your environments? Or are the changes to move >>>>>> shutdown >>>>>> out of critical sections going to be enough? >>>>>> >>>>>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid> >>>>>> wrote: >>>>>> >>>>>> > Hi Harini, >>>>>> > >>>>>> > We have seen issues like this related to task roll time, related to >>>>>> task >>>>>> > queue notifications on overlord instances; I have a patch running >>>>>> > internally that resolves this. >>>>>> > >>>>>> > These are my internal triage notes: >>>>>> > ====== >>>>>> > - Whenever task scheduling is happening (startup, ingest segment >>>>>> task >>>>>> > rollover, redeployment of datasource) Overlord takes a long time to >>>>>> assign >>>>>> > workers. This compounds because tasks sit so long before deployment >>>>>> that it >>>>>> > starts failing tasks and having to relaunch them. >>>>>> > >>>>>> > - TaskQueue: notifyStatus() which receives updates from the >>>>>> > middlemanagers, and the manage() loop which controls services, >>>>>> runs >>>>>> > through >>>>>> > a single lock. For example, the shutdown request involves >>>>>> submitting >>>>>> > downstream HTTP requests synchronously (while holding the lock). >>>>>> > - This means for a cluster with ~700 tasks that tasks are held >>>>>> for >>>>>> > nearly 1second, and only after each 1 second around the manage >>>>>> loop can >>>>>> > 1-2 >>>>>> > notifications be processed. For a new startup, with 700 tasks, >>>>>> and a >>>>>> > 1sec >>>>>> > delay, that is 300-600-or-more seconds for the overlord to >>>>>> realise all >>>>>> > the >>>>>> > tasks are started by the middle manager. >>>>>> > - Similar delays happen for any other operations. >>>>>> > - Sub-optimal logging code path (double-concatening very long log >>>>>> > entries), >>>>>> > - ZkWorker: Worker fully deserializing all ZK payload data every >>>>>> time >>>>>> > looking up task IDs rather than only looking at the ID fields. >>>>>> > Similarly, >>>>>> > repeat fetching data on task assignment. >>>>>> > >>>>>> > ===== >>>>>> > >>>>>> > The patch I have is here: >>>>>> > https://github.com/jasonk000/druid/pull/7/files >>>>>> > >>>>>> > It fixes a couple of things, most importantly the task queue >>>>>> notification >>>>>> > system. The system is much more stable with high task counts and >>>>>> will >>>>>> > easily restart many tasks concurrently. >>>>>> > >>>>>> > I have other perf issues I want to look at first before I can >>>>>> document it >>>>>> > fully, build a test case, rebase it on apache/master, etc. If you >>>>>> test it >>>>>> > out, and it works, we could submit a PR that would resolve it. >>>>>> > >>>>>> > PS - I have a queue of similar fixes I'd like to submit, but need >>>>>> some time >>>>>> > to do the documentation, build test cases, upstreaming, etc, if >>>>>> anyone >>>>>> > wants to collaborate, I could open some Issues and share my partial >>>>>> notes. >>>>>> > >>>>>> > Thanks >>>>>> > Jason >>>>>> > >>>>>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran >>>>>> > <hrajend...@confluent.io.invalid> wrote: >>>>>> > >>>>>> > > Hi all, >>>>>> > > >>>>>> > > I have been investigating this in the background for a few days >>>>>> now and >>>>>> > > need some help from the community. >>>>>> > > >>>>>> > > We noticed that every hour, when the tasks roll, we see a spike >>>>>> in the >>>>>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running on >>>>>> this >>>>>> > > datasource. >>>>>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png] >>>>>> > > >>>>>> > > On further debugging of task logs, we found out that around the >>>>>> duration >>>>>> > > when the ingestion lag spikes up, *the gap between pause and >>>>>> resume >>>>>> > > commands in the task logs during checkpointing are wide ranging >>>>>> from few >>>>>> > > seconds to couple minutes*. For example, in the following task >>>>>> logs you >>>>>> > > can see that it was about 1.5 minutes. >>>>>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG", >>>>>> > > "message":"Received pause command, *pausing* ingestion until >>>>>> resumed.", " >>>>>> > > service.name >>>>>> > > >>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>> > > process.thread.name >>>>>> > > >>>>>> > >>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG", >>>>>> > > "message":"Received pause command, *pausing* ingestion until >>>>>> resumed.", " >>>>>> > > service.name >>>>>> > > >>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>> > > process.thread.name >>>>>> > > >>>>>> > >>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG", >>>>>> > > "message":"Received resume command, *resuming* ingestion.", " >>>>>> > service.name >>>>>> > > >>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log"," >>>>>> > > process.thread.name >>>>>> > > >>>>>> > >>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"} >>>>>> > > So this explains why ingestion is lagging as the *tasks are >>>>>> paused for a >>>>>> > > long time*. >>>>>> > > >>>>>> > > *Why are there 2 pauses during checkpointing and why such a huge >>>>>> gap?* >>>>>> > > As a next step, I wanted to see why there is such a wide gap. >>>>>> Then we >>>>>> > > realized that the first pause is when the task pauses itself here >>>>>> > > < >>>>>> > >>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728 >>>>>> > >>>>>> > while >>>>>> > > requesting the supervisor for a checkpoint. And the second pause >>>>>> is when >>>>>> > > the supervisor actually handles the checkpoint notice here >>>>>> > > < >>>>>> > >>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548 >>>>>> > > >>>>>> > > . >>>>>> > > And since the supervisor thread for this data source takes such a >>>>>> long >>>>>> > > time to process all the notices in the queue before this >>>>>> checkpoint >>>>>> > notice, >>>>>> > > the ingestion task ends up being in the paused state for a long >>>>>> time. >>>>>> > > >>>>>> > > *Why does the supervisor thread take such a long time to get to >>>>>> this >>>>>> > > checkpoint notice?* >>>>>> > > That was my next step in debugging. >>>>>> > > Proving our theory, we noticed that the *noticesQueue in the >>>>>> supervisor >>>>>> > > does get backed up with 100s of notices every hour when tasks >>>>>> roll*. >>>>>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png] >>>>>> > > And we saw that *run_notice takes between 5s and 7s during task >>>>>> rolls*. >>>>>> > > And this causes backing up of noticesQueue causing checkpoint >>>>>> notice to >>>>>> > be >>>>>> > > in the queue for long leading to ingestion lag spike whenever >>>>>> tasks roll. >>>>>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png] >>>>>> > > >>>>>> > > *Why does run_notice take 5-7s to finish?* >>>>>> > > When the task starts, it takes about 5s for the HTTP server to >>>>>> come up. >>>>>> > > So, till then the supervisor thread is in a loop trying to get >>>>>> the task >>>>>> > > status and this causes run_notice to take about 5-7s to finish. >>>>>> > > >>>>>> > > *Questions to the community* >>>>>> > > Do we need 2 pauses during checkpointing? Should the task pause >>>>>> itself >>>>>> > > before requesting a checkpoint notice given that the supervisor is >>>>>> > anyways >>>>>> > > going to pause the task while handling the notice? Or is it okay >>>>>> to >>>>>> > remove >>>>>> > > the pause in the TaskRunner before it sends a checkpoint notice >>>>>> request >>>>>> > to >>>>>> > > the supervisor? This would immediately solve the ingestion lag >>>>>> issue >>>>>> > > completely as there won't be two pauses with such a huge gap in >>>>>> between. >>>>>> > > >>>>>> > > Harini >>>>>> > > Software Engineer, Confluent >>>>>> > > >>>>>> > >>>>>> >>>>>