Hi Jason,

I shall take a look at these 3 PRs and see if we can try these out in our
test environment.

Also, we use AWS RDS as the metadata engine.

Harini
Software Engineer, Observability
+1 412 708 3872



On Fri, Dec 31, 2021 at 3:22 PM Jason Koch <jk...@netflix.com> wrote:

> Hi Harini,
>
> I had a chance to look at the checkpoint behaviour you mentioned in more
> detail, and found two codepaths where the RunNotice code ends up in the
> TaskQueue, and hits the same locks. I'd be interested if you want to try
> the three related PRs I have submitted. (I added more detail to the issue
> https://github.com/apache/druid/issues/11414).
>
> Failing that, I think the best next step would be some stack traces /
> and/or profiler output from the supervisor at the time of rollover. It
> would also be useful to know which metadata storage solution you are using
> as the RunNotice interacts with metadata engine too.
>
> Thanks
> Jason
>
> On Fri, Dec 3, 2021 at 2:43 PM Jason Koch <jk...@netflix.com> wrote:
>
>> Gian,
>>
>> I've submitted a PR to gianm/tq-scale-test that provides a concurrent
>> test, (and fixes a concurrency bug I found along the way). The change uses
>> an 8millis response time for shutdown acknowledgment, and a 2 second time
>> for shutdown completion/notification.
>>
>> Based on this test,
>> - serial TaskQueue timeout occurs after 60sec for each test, and,
>> - concurrent TaskQueue passes in ~10sec per test,
>>
>> https://github.com/gianm/druid/pull/3/files
>>
>> Let me know your thoughts.
>>
>> Thanks
>> Jason
>>
>> On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote:
>>
>>> Hi Gian
>>>
>>> > Jason, also interesting findings! I took a crack at rebasing your
>>> patch on
>>> > master and adding a scale test for the TaskQueue with 1000 simulated
>>> tasks:
>>> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test
>>> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>.
>>> When
>>> > I run the scale test, "doMassLaunchAndExit" passes quickly but
>>> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots of
>>> tasks
>>> > is still a bottleneck.
>>>
>>> Looks good, I'll come back to the test below.
>>>
>>> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be
>>> pretty
>>> > straightforward to make the shutdown API asynchronous, which would help
>>> > speed up anything that is shutting down lots of tasks all at once.
>>> Would
>>> > that be helpful in your environments? Or are the changes to move
>>> shutdown
>>> > out of critical sections going to be enough?
>>>
>>> This would be the logical next step for some environments, but I didn't
>>> need to go
>>> that far. For this particular cluster we are reading from Kafka, which
>>> is a
>>> SeekableStreamIndexRunner, the /stop call does not stop directly, it
>>> only sets a
>>> flag, so, the response back to overlord comes back in
>>> single-digit-milliseconds.
>>>
>>> Some extra detail on the specific interaction as related to Kafka might
>>> make
>>> the problem/fix more clear ...
>>>
>>> ==
>>>
>>> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms to
>>> acknowledge a stop request. TaskQueue#L322-L332 is going to issue 500x2ms
>>> requests to stop all of them, which will take approx 1 second to
>>> complete.
>>>
>>> Notice it is doing this whilst holding the lock.
>>>
>>> After cleanup those tasks will issue a status update via ZK ->
>>> RemoteTaskRunner
>>> ::taskComplete that they have completed. That taskComplete fires the
>>> future
>>> completion which lands back in TaskQueue::notifyStatus where the
>>> TaskQueue
>>> can now update state that the task has finished.
>>>
>>> But - notifyStatus can *only* proceed once the lock has been released,
>>> and then
>>> it claims the lock, and calls removeTaskInternal. At this point the lock
>>> is released,
>>> and, maybe a few more concurrent ZK->notifyStatus() calls proceed. Let's
>>> suppose we got lucky, and we processed 10 requests, which have now been
>>> removed from the TaskQueue.
>>>
>>> At some point though, TaskQueue manage loop is going to get that lock,
>>> and we
>>> are now at 490 tasks that the queue believes are running, which we
>>> expected
>>> to be stopped, so we issue another 490*2ms=980ms of HTTP stop requests.
>>> And then, maybe, we get another 10 notifyStatus complete .... and we
>>> issue 480,
>>> and so on.
>>>
>>> Evidently, this is going to take a long time for the TaskQueue to
>>> quiesce to the
>>> correct state, and things are a little confused across the cluster until
>>> that
>>> happens. And, to top it off, tasks and supervisor get confused as to
>>> progress,
>>> so the task is marked failed, and put back in queue to restart, which
>>> means
>>> it takes longer.
>>>
>>> The fix is basically to make sure that the TaskQueue::notifyStatus can
>>> proceed
>>> to update the task state without blocking. Following that we get a flood
>>> of ZK
>>> updates in short order, so making the logging & ZK processing more
>>> efficient
>>> significantly reduces the time for quiesce to complete.
>>>
>>> ==
>>>
>>> So back to the test, it looks good, and I think some tweaks need to
>>> happen to
>>> replicate the above:
>>> (1) the taskQueue.add() and shutdown() calls should be run concurrently
>>> from lots
>>> of threads
>>> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms, and
>>> concurrently sleep some time, followed by a
>>> knownTasks.get(taskId).setResult(...), which I think will trigger the
>>> notification loop.
>>>
>>> I'll take a shot at this after lunch today.
>>>
>>> Sample logfile of a quick clean shutdown of a kafka task, from overlord
>>> view:
>>>
>>> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log
>>>
>>> Thanks
>>> Jason
>>>
>>>
>>> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote:
>>>
>>>> Harini, those are interesting findings. I'm not sure if the two pauses
>>>> are
>>>> necessary, but my thought is that it ideally shouldn't matter because
>>>> the
>>>> supervisor shouldn't be taking that long to handle its notices. A couple
>>>> things come to mind about that:
>>>>
>>>> 1) Did you see what specifically the supervisor is doing when it's
>>>> handling
>>>> the notices? Maybe from a stack trace? We should look into optimizing
>>>> it,
>>>> or making it asynchronous or something, depending on what it is.
>>>> 2) Although, there isn't really a need to trigger a run for every single
>>>> task status change anyway; I think it's ok to coalesce them. This patch
>>>> would do it: https://github.com/apache/druid/pull/12018
>>>>
>>>> Jason, also interesting findings! I took a crack at rebasing your patch
>>>> on
>>>> master and adding a scale test for the TaskQueue with 1000 simulated
>>>> tasks:
>>>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test.
>>>> When
>>>> I run the scale test, "doMassLaunchAndExit" passes quickly but
>>>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of
>>>> tasks
>>>> is still a bottleneck.
>>>>
>>>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be
>>>> pretty
>>>> straightforward to make the shutdown API asynchronous, which would help
>>>> speed up anything that is shutting down lots of tasks all at once. Would
>>>> that be helpful in your environments? Or are the changes to move
>>>> shutdown
>>>> out of critical sections going to be enough?
>>>>
>>>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>> > Hi Harini,
>>>> >
>>>> > We have seen issues like this related to task roll time, related to
>>>> task
>>>> > queue notifications on overlord instances; I have a patch running
>>>> > internally that resolves this.
>>>> >
>>>> > These are my internal triage notes:
>>>> > ======
>>>> > - Whenever task scheduling is happening (startup, ingest segment task
>>>> > rollover, redeployment of datasource) Overlord takes a long time to
>>>> assign
>>>> > workers. This compounds because tasks sit so long before deployment
>>>> that it
>>>> > starts failing tasks and having to relaunch them.
>>>> >
>>>> >    - TaskQueue: notifyStatus() which receives updates from the
>>>> >    middlemanagers, and the manage() loop which controls services, runs
>>>> > through
>>>> >    a single lock. For example, the shutdown request involves
>>>> submitting
>>>> >    downstream HTTP requests synchronously (while holding the lock).
>>>> >    - This means for a cluster with ~700 tasks that tasks are held for
>>>> >    nearly 1second, and only after each 1 second around the manage
>>>> loop can
>>>> > 1-2
>>>> >    notifications be processed. For a new startup, with 700 tasks, and
>>>> a
>>>> > 1sec
>>>> >    delay, that is 300-600-or-more seconds for the overlord to realise
>>>> all
>>>> > the
>>>> >    tasks are started by the middle manager.
>>>> >    - Similar delays happen for any other operations.
>>>> >    - Sub-optimal logging code path (double-concatening very long log
>>>> >    entries),
>>>> >    - ZkWorker: Worker fully deserializing all ZK payload data every
>>>> time
>>>> >    looking up task IDs rather than only looking at the ID fields.
>>>> > Similarly,
>>>> >    repeat fetching data on task assignment.
>>>> >
>>>> > =====
>>>> >
>>>> > The patch I have is here:
>>>> > https://github.com/jasonk000/druid/pull/7/files
>>>> >
>>>> > It fixes a couple of things, most importantly the task queue
>>>> notification
>>>> > system. The system is much more stable with high task counts and will
>>>> > easily restart many tasks concurrently.
>>>> >
>>>> > I have other perf issues I want to look at first before I can
>>>> document it
>>>> > fully, build a test case, rebase it on apache/master, etc. If you
>>>> test it
>>>> > out, and it works, we could submit a PR that would resolve it.
>>>> >
>>>> > PS - I have a queue of similar fixes I'd like to submit, but need
>>>> some time
>>>> > to do the documentation, build test cases, upstreaming, etc, if anyone
>>>> > wants to collaborate, I could open some Issues and share my partial
>>>> notes.
>>>> >
>>>> > Thanks
>>>> > Jason
>>>> >
>>>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran
>>>> > <hrajend...@confluent.io.invalid> wrote:
>>>> >
>>>> > > Hi all,
>>>> > >
>>>> > > I have been investigating this in the background for a few days now
>>>> and
>>>> > > need some help from the community.
>>>> > >
>>>> > > We noticed that every hour, when the tasks roll, we see a spike in
>>>> the
>>>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running on
>>>> this
>>>> > > datasource.
>>>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png]
>>>> > >
>>>> > > On further debugging of task logs, we found out that around the
>>>> duration
>>>> > > when the ingestion lag spikes up, *the gap between pause and resume
>>>> > > commands in the task logs during checkpointing are wide ranging
>>>> from few
>>>> > > seconds to couple minutes*. For example, in the following task logs
>>>> you
>>>> > > can see that it was about 1.5 minutes.
>>>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG",
>>>> > > "message":"Received pause command, *pausing* ingestion until
>>>> resumed.", "
>>>> > > service.name
>>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>> > > process.thread.name
>>>> > >
>>>> >
>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG",
>>>> > > "message":"Received pause command, *pausing* ingestion until
>>>> resumed.", "
>>>> > > service.name
>>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>> > > process.thread.name
>>>> > >
>>>> >
>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG",
>>>> > > "message":"Received resume command, *resuming* ingestion.", "
>>>> > service.name
>>>> > > ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>> > > process.thread.name
>>>> > >
>>>> >
>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>> > > So this explains why ingestion is lagging as the *tasks are paused
>>>> for a
>>>> > > long time*.
>>>> > >
>>>> > > *Why are there 2 pauses during checkpointing and why such a huge
>>>> gap?*
>>>> > > As a next step, I wanted to see why there is such a wide gap. Then
>>>> we
>>>> > > realized that the first pause is when the task pauses itself here
>>>> > > <
>>>> >
>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728
>>>> >
>>>> > while
>>>> > > requesting the supervisor for a checkpoint. And the second pause is
>>>> when
>>>> > > the supervisor actually handles the checkpoint notice here
>>>> > > <
>>>> >
>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548
>>>> > >
>>>> > > .
>>>> > > And since the supervisor thread for this data source takes such a
>>>> long
>>>> > > time to process all the notices in the queue before this checkpoint
>>>> > notice,
>>>> > > the ingestion task ends up being in the paused state for a long
>>>> time.
>>>> > >
>>>> > > *Why does the supervisor thread take such a long time to get to this
>>>> > > checkpoint notice?*
>>>> > > That was my next step in debugging.
>>>> > > Proving our theory, we noticed that the *noticesQueue in the
>>>> supervisor
>>>> > > does get backed up with 100s of notices every hour when tasks roll*.
>>>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png]
>>>> > > And we saw that *run_notice takes between 5s and 7s during task
>>>> rolls*.
>>>> > > And this causes backing up of noticesQueue causing checkpoint
>>>> notice to
>>>> > be
>>>> > > in the queue for long leading to ingestion lag spike whenever tasks
>>>> roll.
>>>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png]
>>>> > >
>>>> > > *Why does run_notice take 5-7s to finish?*
>>>> > > When the task starts, it takes about 5s for the HTTP server to come
>>>> up.
>>>> > > So, till then the supervisor thread is in a loop trying to get the
>>>> task
>>>> > > status and this causes run_notice to take about 5-7s to finish.
>>>> > >
>>>> > > *Questions to the community*
>>>> > > Do we need 2 pauses during checkpointing? Should the task pause
>>>> itself
>>>> > > before requesting a checkpoint notice given that the supervisor is
>>>> > anyways
>>>> > > going to pause the task while handling the notice? Or is it okay to
>>>> > remove
>>>> > > the pause in the TaskRunner before it sends a checkpoint notice
>>>> request
>>>> > to
>>>> > > the supervisor? This would immediately solve the ingestion lag issue
>>>> > > completely as there won't be two pauses with such a huge gap in
>>>> between.
>>>> > >
>>>> > > Harini
>>>> > > Software Engineer, Confluent
>>>> > >
>>>> >
>>>>
>>>

Reply via email to