That's great! Thanks. I'll keep an eye out on those PRs.

Harini
Software Engineer, Observability
+1 412 708 3872



On Wed, Jan 5, 2022 at 11:00 AM Jason Koch <jk...@netflix.com> wrote:

> Thanks for the prompt - yes I'll get these fixed. They are code coverage /
> linter fixes, I had mistakenly assumed they were flake-y tests. I'll aim to
> fix these today.
>
> On Wed, Jan 5, 2022 at 7:25 AM Harini Rajendran <hrajend...@confluent.io>
> wrote:
>
>> Hi Jason,
>>
>> I was taking a look at your PRs and see that CI build is failing for 2 of
>> them. Do you know why those are failing and are you planning to fix them?
>>
>> Harini
>> Software Engineer, Observability
>> +1 412 708 3872
>>
>>
>>
>> On Mon, Jan 3, 2022 at 7:59 PM Harini Rajendran <hrajend...@confluent.io>
>> wrote:
>>
>>> Hi Jason,
>>>
>>> I shall take a look at these 3 PRs and see if we can try these out in
>>> our test environment.
>>>
>>> Also, we use AWS RDS as the metadata engine.
>>>
>>> Harini
>>> Software Engineer, Observability
>>> +1 412 708 3872
>>>
>>>
>>>
>>> On Fri, Dec 31, 2021 at 3:22 PM Jason Koch <jk...@netflix.com> wrote:
>>>
>>>> Hi Harini,
>>>>
>>>> I had a chance to look at the checkpoint behaviour you mentioned in
>>>> more detail, and found two codepaths where the RunNotice code ends up in
>>>> the TaskQueue, and hits the same locks. I'd be interested if you want to
>>>> try the three related PRs I have submitted. (I added more detail to the
>>>> issue https://github.com/apache/druid/issues/11414).
>>>>
>>>> Failing that, I think the best next step would be some stack traces /
>>>> and/or profiler output from the supervisor at the time of rollover. It
>>>> would also be useful to know which metadata storage solution you are using
>>>> as the RunNotice interacts with metadata engine too.
>>>>
>>>> Thanks
>>>> Jason
>>>>
>>>> On Fri, Dec 3, 2021 at 2:43 PM Jason Koch <jk...@netflix.com> wrote:
>>>>
>>>>> Gian,
>>>>>
>>>>> I've submitted a PR to gianm/tq-scale-test that provides a concurrent
>>>>> test, (and fixes a concurrency bug I found along the way). The change uses
>>>>> an 8millis response time for shutdown acknowledgment, and a 2 second time
>>>>> for shutdown completion/notification.
>>>>>
>>>>> Based on this test,
>>>>> - serial TaskQueue timeout occurs after 60sec for each test, and,
>>>>> - concurrent TaskQueue passes in ~10sec per test,
>>>>>
>>>>> https://github.com/gianm/druid/pull/3/files
>>>>>
>>>>> Let me know your thoughts.
>>>>>
>>>>> Thanks
>>>>> Jason
>>>>>
>>>>> On Fri, Dec 3, 2021 at 11:41 AM Jason Koch <jk...@netflix.com> wrote:
>>>>>
>>>>>> Hi Gian
>>>>>>
>>>>>> > Jason, also interesting findings! I took a crack at rebasing your
>>>>>> patch on
>>>>>> > master and adding a scale test for the TaskQueue with 1000
>>>>>> simulated tasks:
>>>>>> > https://github.com/apache/druid/compare/master...gianm:tq-scale-test
>>>>>> <https://github.com/apache/druid/compare/master...gianm:tq-scale-test>.
>>>>>> When
>>>>>> > I run the scale test, "doMassLaunchAndExit" passes quickly but
>>>>>> > "doMassLaunchAndShutdown" times out. I suppose shutting down lots
>>>>>> of tasks
>>>>>> > is still a bottleneck.
>>>>>>
>>>>>> Looks good, I'll come back to the test below.
>>>>>>
>>>>>> > Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be
>>>>>> pretty
>>>>>> > straightforward to make the shutdown API asynchronous, which would
>>>>>> help
>>>>>> > speed up anything that is shutting down lots of tasks all at once.
>>>>>> Would
>>>>>> > that be helpful in your environments? Or are the changes to move
>>>>>> shutdown
>>>>>> > out of critical sections going to be enough?
>>>>>>
>>>>>> This would be the logical next step for some environments, but I
>>>>>> didn't need to go
>>>>>> that far. For this particular cluster we are reading from Kafka,
>>>>>> which is a
>>>>>> SeekableStreamIndexRunner, the /stop call does not stop directly, it
>>>>>> only sets a
>>>>>> flag, so, the response back to overlord comes back in
>>>>>> single-digit-milliseconds.
>>>>>>
>>>>>> Some extra detail on the specific interaction as related to Kafka
>>>>>> might make
>>>>>> the problem/fix more clear ...
>>>>>>
>>>>>> ==
>>>>>>
>>>>>> Suppose for simplicity we have 500 tasks to roll, and each takes 2ms
>>>>>> to
>>>>>> acknowledge a stop request. TaskQueue#L322-L332 is going to issue
>>>>>> 500x2ms
>>>>>> requests to stop all of them, which will take approx 1 second to
>>>>>> complete.
>>>>>>
>>>>>> Notice it is doing this whilst holding the lock.
>>>>>>
>>>>>> After cleanup those tasks will issue a status update via ZK ->
>>>>>> RemoteTaskRunner
>>>>>> ::taskComplete that they have completed. That taskComplete fires the
>>>>>> future
>>>>>> completion which lands back in TaskQueue::notifyStatus where the
>>>>>> TaskQueue
>>>>>> can now update state that the task has finished.
>>>>>>
>>>>>> But - notifyStatus can *only* proceed once the lock has been
>>>>>> released, and then
>>>>>> it claims the lock, and calls removeTaskInternal. At this point the
>>>>>> lock is released,
>>>>>> and, maybe a few more concurrent ZK->notifyStatus() calls proceed.
>>>>>> Let's
>>>>>> suppose we got lucky, and we processed 10 requests, which have now
>>>>>> been
>>>>>> removed from the TaskQueue.
>>>>>>
>>>>>> At some point though, TaskQueue manage loop is going to get that
>>>>>> lock, and we
>>>>>> are now at 490 tasks that the queue believes are running, which we
>>>>>> expected
>>>>>> to be stopped, so we issue another 490*2ms=980ms of HTTP stop
>>>>>> requests.
>>>>>> And then, maybe, we get another 10 notifyStatus complete .... and we
>>>>>> issue 480,
>>>>>> and so on.
>>>>>>
>>>>>> Evidently, this is going to take a long time for the TaskQueue to
>>>>>> quiesce to the
>>>>>> correct state, and things are a little confused across the cluster
>>>>>> until that
>>>>>> happens. And, to top it off, tasks and supervisor get confused as to
>>>>>> progress,
>>>>>> so the task is marked failed, and put back in queue to restart, which
>>>>>> means
>>>>>> it takes longer.
>>>>>>
>>>>>> The fix is basically to make sure that the TaskQueue::notifyStatus
>>>>>> can proceed
>>>>>> to update the task state without blocking. Following that we get a
>>>>>> flood of ZK
>>>>>> updates in short order, so making the logging & ZK processing more
>>>>>> efficient
>>>>>> significantly reduces the time for quiesce to complete.
>>>>>>
>>>>>> ==
>>>>>>
>>>>>> So back to the test, it looks good, and I think some tweaks need to
>>>>>> happen to
>>>>>> replicate the above:
>>>>>> (1) the taskQueue.add() and shutdown() calls should be run
>>>>>> concurrently from lots
>>>>>> of threads
>>>>>> (2) the TestTaskRunner::shutdown(taskId) call should respond in ~5ms,
>>>>>> and
>>>>>> concurrently sleep some time, followed by a
>>>>>> knownTasks.get(taskId).setResult(...), which I think will trigger the
>>>>>> notification loop.
>>>>>>
>>>>>> I'll take a shot at this after lunch today.
>>>>>>
>>>>>> Sample logfile of a quick clean shutdown of a kafka task, from
>>>>>> overlord view:
>>>>>>
>>>>>> https://gist.githubusercontent.com/jasonk000/40c11dce3faed44d3a89c27e0227b982/raw/a456a4cddd31f508e618321e03ef8b5241931904/druid.log
>>>>>>
>>>>>> Thanks
>>>>>> Jason
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 2, 2021 at 4:58 AM Gian Merlino <g...@apache.org> wrote:
>>>>>>
>>>>>>> Harini, those are interesting findings. I'm not sure if the two
>>>>>>> pauses are
>>>>>>> necessary, but my thought is that it ideally shouldn't matter
>>>>>>> because the
>>>>>>> supervisor shouldn't be taking that long to handle its notices. A
>>>>>>> couple
>>>>>>> things come to mind about that:
>>>>>>>
>>>>>>> 1) Did you see what specifically the supervisor is doing when it's
>>>>>>> handling
>>>>>>> the notices? Maybe from a stack trace? We should look into
>>>>>>> optimizing it,
>>>>>>> or making it asynchronous or something, depending on what it is.
>>>>>>> 2) Although, there isn't really a need to trigger a run for every
>>>>>>> single
>>>>>>> task status change anyway; I think it's ok to coalesce them. This
>>>>>>> patch
>>>>>>> would do it: https://github.com/apache/druid/pull/12018
>>>>>>>
>>>>>>> Jason, also interesting findings! I took a crack at rebasing your
>>>>>>> patch on
>>>>>>> master and adding a scale test for the TaskQueue with 1000 simulated
>>>>>>> tasks:
>>>>>>> https://github.com/apache/druid/compare/master...gianm:tq-scale-test.
>>>>>>> When
>>>>>>> I run the scale test, "doMassLaunchAndExit" passes quickly but
>>>>>>> "doMassLaunchAndShutdown" times out. I suppose shutting down lots of
>>>>>>> tasks
>>>>>>> is still a bottleneck.
>>>>>>>
>>>>>>> Looking at RemoteTaskRunner and HttpRemoteTaskRunner, it should be
>>>>>>> pretty
>>>>>>> straightforward to make the shutdown API asynchronous, which would
>>>>>>> help
>>>>>>> speed up anything that is shutting down lots of tasks all at once.
>>>>>>> Would
>>>>>>> that be helpful in your environments? Or are the changes to move
>>>>>>> shutdown
>>>>>>> out of critical sections going to be enough?
>>>>>>>
>>>>>>> On Wed, Dec 1, 2021 at 1:27 PM Jason Koch <jk...@netflix.com.invalid>
>>>>>>> wrote:
>>>>>>>
>>>>>>> > Hi Harini,
>>>>>>> >
>>>>>>> > We have seen issues like this related to task roll time, related
>>>>>>> to task
>>>>>>> > queue notifications on overlord instances; I have a patch running
>>>>>>> > internally that resolves this.
>>>>>>> >
>>>>>>> > These are my internal triage notes:
>>>>>>> > ======
>>>>>>> > - Whenever task scheduling is happening (startup, ingest segment
>>>>>>> task
>>>>>>> > rollover, redeployment of datasource) Overlord takes a long time
>>>>>>> to assign
>>>>>>> > workers. This compounds because tasks sit so long before
>>>>>>> deployment that it
>>>>>>> > starts failing tasks and having to relaunch them.
>>>>>>> >
>>>>>>> >    - TaskQueue: notifyStatus() which receives updates from the
>>>>>>> >    middlemanagers, and the manage() loop which controls services,
>>>>>>> runs
>>>>>>> > through
>>>>>>> >    a single lock. For example, the shutdown request involves
>>>>>>> submitting
>>>>>>> >    downstream HTTP requests synchronously (while holding the lock).
>>>>>>> >    - This means for a cluster with ~700 tasks that tasks are held
>>>>>>> for
>>>>>>> >    nearly 1second, and only after each 1 second around the manage
>>>>>>> loop can
>>>>>>> > 1-2
>>>>>>> >    notifications be processed. For a new startup, with 700 tasks,
>>>>>>> and a
>>>>>>> > 1sec
>>>>>>> >    delay, that is 300-600-or-more seconds for the overlord to
>>>>>>> realise all
>>>>>>> > the
>>>>>>> >    tasks are started by the middle manager.
>>>>>>> >    - Similar delays happen for any other operations.
>>>>>>> >    - Sub-optimal logging code path (double-concatening very long
>>>>>>> log
>>>>>>> >    entries),
>>>>>>> >    - ZkWorker: Worker fully deserializing all ZK payload data
>>>>>>> every time
>>>>>>> >    looking up task IDs rather than only looking at the ID fields.
>>>>>>> > Similarly,
>>>>>>> >    repeat fetching data on task assignment.
>>>>>>> >
>>>>>>> > =====
>>>>>>> >
>>>>>>> > The patch I have is here:
>>>>>>> > https://github.com/jasonk000/druid/pull/7/files
>>>>>>> >
>>>>>>> > It fixes a couple of things, most importantly the task queue
>>>>>>> notification
>>>>>>> > system. The system is much more stable with high task counts and
>>>>>>> will
>>>>>>> > easily restart many tasks concurrently.
>>>>>>> >
>>>>>>> > I have other perf issues I want to look at first before I can
>>>>>>> document it
>>>>>>> > fully, build a test case, rebase it on apache/master, etc. If you
>>>>>>> test it
>>>>>>> > out, and it works, we could submit a PR that would resolve it.
>>>>>>> >
>>>>>>> > PS - I have a queue of similar fixes I'd like to submit, but need
>>>>>>> some time
>>>>>>> > to do the documentation, build test cases, upstreaming, etc, if
>>>>>>> anyone
>>>>>>> > wants to collaborate, I could open some Issues and share my
>>>>>>> partial notes.
>>>>>>> >
>>>>>>> > Thanks
>>>>>>> > Jason
>>>>>>> >
>>>>>>> > On Wed, Dec 1, 2021 at 12:59 PM Harini Rajendran
>>>>>>> > <hrajend...@confluent.io.invalid> wrote:
>>>>>>> >
>>>>>>> > > Hi all,
>>>>>>> > >
>>>>>>> > > I have been investigating this in the background for a few days
>>>>>>> now and
>>>>>>> > > need some help from the community.
>>>>>>> > >
>>>>>>> > > We noticed that every hour, when the tasks roll, we see a spike
>>>>>>> in the
>>>>>>> > > ingestion lag for about 2-4 minutes. We have 180 tasks running
>>>>>>> on this
>>>>>>> > > datasource.
>>>>>>> > > [image: Screen Shot 2021-12-01 at 9.14.23 AM.png]
>>>>>>> > >
>>>>>>> > > On further debugging of task logs, we found out that around the
>>>>>>> duration
>>>>>>> > > when the ingestion lag spikes up, *the gap between pause and
>>>>>>> resume
>>>>>>> > > commands in the task logs during checkpointing are wide ranging
>>>>>>> from few
>>>>>>> > > seconds to couple minutes*. For example, in the following task
>>>>>>> logs you
>>>>>>> > > can see that it was about 1.5 minutes.
>>>>>>> > > {"@timestamp":"2021-11-18T*20:06:58.513Z*", "log.level":"DEBUG",
>>>>>>> > > "message":"Received pause command, *pausing* ingestion until
>>>>>>> resumed.", "
>>>>>>> > > service.name
>>>>>>> > >
>>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>>>>> > > process.thread.name
>>>>>>> > >
>>>>>>> >
>>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.326Z*", "log.level":"DEBUG",
>>>>>>> > > "message":"Received pause command, *pausing* ingestion until
>>>>>>> resumed.", "
>>>>>>> > > service.name
>>>>>>> > >
>>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>>>>> > > process.thread.name
>>>>>>> > >
>>>>>>> >
>>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>>>>> > > {"@timestamp":"2021-11-18T*20:08:26.329Z*", "log.level":"DEBUG",
>>>>>>> > > "message":"Received resume command, *resuming* ingestion.", "
>>>>>>> > service.name
>>>>>>> > >
>>>>>>> ":"druid/middleManager","event.dataset":"druid/middleManager.log","
>>>>>>> > > process.thread.name
>>>>>>> > >
>>>>>>> >
>>>>>>> ":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
>>>>>>> > > So this explains why ingestion is lagging as the *tasks are
>>>>>>> paused for a
>>>>>>> > > long time*.
>>>>>>> > >
>>>>>>> > > *Why are there 2 pauses during checkpointing and why such a huge
>>>>>>> gap?*
>>>>>>> > > As a next step, I wanted to see why there is such a wide gap.
>>>>>>> Then we
>>>>>>> > > realized that the first pause is when the task pauses itself here
>>>>>>> > > <
>>>>>>> >
>>>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L728
>>>>>>> >
>>>>>>> > while
>>>>>>> > > requesting the supervisor for a checkpoint. And the second pause
>>>>>>> is when
>>>>>>> > > the supervisor actually handles the checkpoint notice here
>>>>>>> > > <
>>>>>>> >
>>>>>>> https://github.com/confluentinc/druid/blob/185ab56e42577dad6b077b415959512b0cd96345/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L2548
>>>>>>> > >
>>>>>>> > > .
>>>>>>> > > And since the supervisor thread for this data source takes such
>>>>>>> a long
>>>>>>> > > time to process all the notices in the queue before this
>>>>>>> checkpoint
>>>>>>> > notice,
>>>>>>> > > the ingestion task ends up being in the paused state for a long
>>>>>>> time.
>>>>>>> > >
>>>>>>> > > *Why does the supervisor thread take such a long time to get to
>>>>>>> this
>>>>>>> > > checkpoint notice?*
>>>>>>> > > That was my next step in debugging.
>>>>>>> > > Proving our theory, we noticed that the *noticesQueue in the
>>>>>>> supervisor
>>>>>>> > > does get backed up with 100s of notices every hour when tasks
>>>>>>> roll*.
>>>>>>> > > [image: Screen Shot 2021-12-01 at 9.32.59 AM.png]
>>>>>>> > > And we saw that *run_notice takes between 5s and 7s during task
>>>>>>> rolls*.
>>>>>>> > > And this causes backing up of noticesQueue causing checkpoint
>>>>>>> notice to
>>>>>>> > be
>>>>>>> > > in the queue for long leading to ingestion lag spike whenever
>>>>>>> tasks roll.
>>>>>>> > > [image: Screen Shot 2021-12-01 at 9.34.29 AM.png]
>>>>>>> > >
>>>>>>> > > *Why does run_notice take 5-7s to finish?*
>>>>>>> > > When the task starts, it takes about 5s for the HTTP server to
>>>>>>> come up.
>>>>>>> > > So, till then the supervisor thread is in a loop trying to get
>>>>>>> the task
>>>>>>> > > status and this causes run_notice to take about 5-7s to finish.
>>>>>>> > >
>>>>>>> > > *Questions to the community*
>>>>>>> > > Do we need 2 pauses during checkpointing? Should the task pause
>>>>>>> itself
>>>>>>> > > before requesting a checkpoint notice given that the supervisor
>>>>>>> is
>>>>>>> > anyways
>>>>>>> > > going to pause the task while handling the notice? Or is it okay
>>>>>>> to
>>>>>>> > remove
>>>>>>> > > the pause in the TaskRunner before it sends a checkpoint notice
>>>>>>> request
>>>>>>> > to
>>>>>>> > > the supervisor? This would immediately solve the ingestion lag
>>>>>>> issue
>>>>>>> > > completely as there won't be two pauses with such a huge gap in
>>>>>>> between.
>>>>>>> > >
>>>>>>> > > Harini
>>>>>>> > > Software Engineer, Confluent
>>>>>>> > >
>>>>>>> >
>>>>>>>
>>>>>>

Reply via email to