Gian,

I've submitted a PR to gianm/tq-scale-test that provides a concurrent test,
(and fixes a concurrency bug I found along the way). The change uses an
8millis response time for shutdown acknowledgment, and a 2 second time for
shutdown completion/notification.

Based on this test,
- serial TaskQueue timeout occurs after 60sec for each test, and,
- concurrent TaskQueue passes in ~10sec per test,

https://github.com/gianm/druid/pull/3/files

Let me know your thoughts.

Thanks
Jason

On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote:

> Hi Gian
>
> > Jason, also interesting findings! I took a crack at rebasing your patch
> on
> > master and adding a scale test for the TaskQueue with 1000 simulated
> tasks:
> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test
> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>.
> When
> > I run the scale test, "doMassLaunchAndExit" passes quickly but
> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots of
> tasks
> > is still a bottleneck.
>
> Looks good, I'll come back to the test below.
>
> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be pretty
> > straightforward to make the shutdown API asynchronous, which would help
> > speed up anything that is shutting down lots of tasks all at once. Would
> > that be helpful in your environments? Or are the changes to move shutdown
> > out of critical sections going to be enough?
>
> This would be the logical next step for some environments, but I didn't
> need to go
> that far. For this particular cluster we are reading from Kafka, which is a
> SeekableStreamIndexRunner, the /stop call does not stop directly, it only
> sets a
> flag, so, the response back to overlord comes back in
> single-digit-milliseconds.
>
> Some extra detail on the specific interaction as related to Kafka might
> make
> the problem/fix more clear ...
>
> ==
>
> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms to
> acknowledge a stop request. TaskQueue#L322-L332 is going to issue 500x2ms
> requests to stop all of them, which will take approx 1 second to complete.
>
> Notice it is doing this whilst holding the lock.
>
> After cleanup those tasks will issue a status update via ZK ->
> RemoteTaskRunner
> ::taskComplete that they have completed. That taskComplete fires the future
> completion which lands back in TaskQueue::notifyStatus where the TaskQueue
> can now update state that the task has finished.
>
> But - notifyStatus can *only* proceed once the lock has been released, and
> then
> it claims the lock, and calls removeTaskInternal. At this point the lock
> is released,
> and, maybe a few more concurrent ZK->notifyStatus() calls proceed. Let's
> suppose we got lucky, and we processed 10 requests, which have now been
> removed from the TaskQueue.
>
> At some point though, TaskQueue manage loop is going to get that lock, and
> we
> are now at 490 tasks that the queue believes are running, which we expected
> to be stopped, so we issue another 490*2ms=980ms of HTTP stop requests.
> And then, maybe, we get another 10 notifyStatus complete .... and we issue
> 480,
> and so on.
>
> Evidently, this is going to take a long time for the TaskQueue to quiesce
> to the
> correct state, and things are a little confused across the cluster until
> that
> happens. And, to top it off, tasks and supervisor get confused as to
> progress,
> so the task is marked failed, and put back in queue to restart, which means
> it takes longer.
>
> The fix is basically to make sure that the TaskQueue::notifyStatus can
> proceed
> to update the task state without blocking. Following that we get a flood
> of ZK
> updates in short order, so making the logging & ZK processing more
> efficient
> significantly reduces the time for quiesce to complete.
>
> ==
>
> So back to the test, it looks good, and I think some tweaks need to happen
> to
> replicate the above:
> (1) the taskQueue.add() and shutdown() calls should be run concurrently
> from lots
> of threads
> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, and
> concurrently sleep some time, followed by a
> knownTasks.get(taskId).setResult(...), which I think will trigger the
> notification loop.
>
> I'll take a shot at this after lunch today.
>
> Sample logfile of a quick clean shutdown of a kafka task, from overlord
> view:
>
> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log
>
> Thanks
> Jason
>
>
> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote:
>
>> Harini, those are interesting findings. I'm not sure if the two pauses are
>> necessary, but my thought is that it ideally shouldn't matter because the
>> supervisor shouldn't be taking that long to handle its notices. A couple
>> things come to mind about that:
>>
>> 1) Did you see what specifically the supervisor is doing when it's
>> handling
>> the notices? Maybe from a stack trace? We should look into optimizing it,
>> or making it asynchronous or something, depending on what it is.
>> 2) Although, there isn't really a need to trigger a run for every single
>> task status change anyway; I think it's ok to coalesce them. This patch
>> would do it: https://github.com/apache/druid/pull/12018
>>
>> Jason, also interesting findings! I took a crack at rebasing your patch on
>> master and adding a scale test for the TaskQueue with 1000 simulated
>> tasks:
>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test.
>> When
>> I run the scale test, "doMassLaunchAndExit" passes quickly but
>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of tasks
>> is still a bottleneck.
>>
>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be pretty
>> straightforward to make the shutdown API asynchronous, which would help
>> speed up anything that is shutting down lots of tasks all at once. Would
>> that be helpful in your environments? Or are the changes to move shutdown
>> out of critical sections going to be enough?
>>
>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid>
>> wrote:
>>
>> > Hi Harini,
>> >
>> > We have seen issues like this related to task roll time, related to task
>> > queue notifications on overlord instances; I have a patch running
>> > internally that resolves this.
>> >
>> > These are my internal triage notes:
>> > ======
>> > - Whenever task scheduling is happening (startup, ingest segment task
>> > rollover, redeployment of datasource) Overlord takes a long time to
>> assign
>> > workers. This compounds because tasks sit so long before deployment
>> that it
>> > starts failing tasks and having to relaunch them.
>> >
>> >    - TaskQueue: notifyStatus() which receives updates from the
>> >    middlemanagers, and the manage() loop which controls services, runs
>> > through
>> >    a single lock. For example, the shutdown request involves submitting
>> >    downstream HTTP requests synchronously (while holding the lock).
>> >    - This means for a cluster with ~700 tasks that tasks are held for
>> >    nearly 1second, and only after each 1 second around the manage loop
>> can
>> > 1-2
>> >    notifications be processed. For a new startup, with 700 tasks, and a
>> > 1sec
>> >    delay, that is 300-600-or-more seconds for the overlord to realise
>> all
>> > the
>> >    tasks are started by the middle manager.
>> >    - Similar delays happen for any other operations.
>> >    - Sub-optimal logging code path (double-concatening very long log
>> >    entries),
>> >    - ZkWorker: Worker fully deserializing all ZK payload data every time
>> >    looking up task IDs rather than only looking at the ID fields.
>> > Similarly,
>> >    repeat fetching data on task assignment.
>> >
>> > =====
>> >
>> > The patch I have is here:
>> > https://github.com/jasonk000/druid/pull/7/files
>> >
>> > It fixes a couple of things, most importantly the task queue
>> notification
>> > system. The system is much more stable with high task counts and will
>> > easily restart many tasks concurrently.
>> >
>> > I have other perf issues I want to look at first before I can document
>> it
>> > fully, build a test case, rebase it on apache/master, etc. If you test
>> it
>> > out, and it works, we could submit a PR that would resolve it.
>> >
>> > PS - I have a queue of similar fixes I'd like to submit, but need some
>> time
>> > to do the documentation, build test cases, upstreaming, etc, if anyone
>> > wants to collaborate, I could open some Issues and share my partial
>> notes.
>> >
>> > Thanks
>> > Jason
>> >
>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran
>> > <hrajend...@confluent.io.invalid> wrote:
>> >
>> > > Hi all,
>> > >
>> > > I have been investigating this in the background for a few days now
>> and
>> > > need some help from the community.
>> > >
>> > > We noticed that every hour, when the tasks roll, we see a spike in the
>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running on this
>> > > datasource.
>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png]
>> > >
>> > > On further debugging of task logs, we found out that around the
>> duration
>> > > when the ingestion lag spikes up, *the gap between pause and resume
>> > > commands in the task logs during checkpointing are wide ranging from
>> few
>> > > seconds to couple minutes*. For example, in the following task logs
>> you
>> > > can see that it was about 1.5 minutes.
>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG",
>> > > "message":"Received pause command, *pausing* ingestion until
>> resumed.", "
>> > > service.name
>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>> > > process.thread.name
>> > >
>> >
>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG",
>> > > "message":"Received pause command, *pausing* ingestion until
>> resumed.", "
>> > > service.name
>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>> > > process.thread.name
>> > >
>> >
>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG",
>> > > "message":"Received resume command, *resuming* ingestion.", "
>> > service.name
>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>> > > process.thread.name
>> > >
>> >
>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>> > > So this explains why ingestion is lagging as the *tasks are paused
>> for a
>> > > long time*.
>> > >
>> > > *Why are there 2 pauses during checkpointing and why such a huge gap?*
>> > > As a next step, I wanted to see why there is such a wide gap. Then we
>> > > realized that the first pause is when the task pauses itself here
>> > > <
>> >
>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728
>> >
>> > while
>> > > requesting the supervisor for a checkpoint. And the second pause is
>> when
>> > > the supervisor actually handles the checkpoint notice here
>> > > <
>> >
>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548
>> > >
>> > > .
>> > > And since the supervisor thread for this data source takes such a long
>> > > time to process all the notices in the queue before this checkpoint
>> > notice,
>> > > the ingestion task ends up being in the paused state for a long time.
>> > >
>> > > *Why does the supervisor thread take such a long time to get to this
>> > > checkpoint notice?*
>> > > That was my next step in debugging.
>> > > Proving our theory, we noticed that the *noticesQueue in the
>> supervisor
>> > > does get backed up with 100s of notices every hour when tasks roll*.
>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png]
>> > > And we saw that *run_notice takes between 5s and 7s during task
>> rolls*.
>> > > And this causes backing up of noticesQueue causing checkpoint notice
>> to
>> > be
>> > > in the queue for long leading to ingestion lag spike whenever tasks
>> roll.
>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png]
>> > >
>> > > *Why does run_notice take 5-7s to finish?*
>> > > When the task starts, it takes about 5s for the HTTP server to come
>> up.
>> > > So, till then the supervisor thread is in a loop trying to get the
>> task
>> > > status and this causes run_notice to take about 5-7s to finish.
>> > >
>> > > *Questions to the community*
>> > > Do we need 2 pauses during checkpointing? Should the task pause itself
>> > > before requesting a checkpoint notice given that the supervisor is
>> > anyways
>> > > going to pause the task while handling the notice? Or is it okay to
>> > remove
>> > > the pause in the TaskRunner before it sends a checkpoint notice
>> request
>> > to
>> > > the supervisor? This would immediately solve the ingestion lag issue
>> > > completely as there won't be two pauses with such a huge gap in
>> between.
>> > >
>> > > Harini
>> > > Software Engineer, Confluent
>> > >
>> >
>>
>

Reply via email to