Hi,

I am not sure I fully understand, so I will try to give an example to
check: we have a simple query that we want to write the result to some
place:

SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a

At the physical plane, we need to

1. read each file in batches
2. join the batches
3. iterate over results and write them in partitions

In principle, we can multi-thread them

1. multi-threaded scan
2. multi-threaded hash join (e.g. with a shared map)
3. multi-threaded write (e.g. 1 file per partition)

The issue is that when we schedule this, the physical nodes themselves
control how they perform their own operations, and there is no
orchestration as to what resources are available and what should be
prioritized. Consequently, we may have a scan of table t1 that is running
with 12 threads, while the scan of table t2 is waiting for a thread to be
available. This causes the computation to stall as both are required for
step 2 to proceed. OTOH, if we have no multithreaded scans, then
multithreading seldom helps, as we are bottlenecked by the scans'
throughput. Is this the gist of the problem?

If yes: the core issue here seems to be that there is no orchestrator to
re-prioritize CPU to where it is needed (the scan of t2 in the example
above), because each physical node has a thread.join that is not
coordinated with their downstream dependencies (and so on). Isn't this a
natural candidate for futures/async? We seem to need some coordination
across the DAG.

If not: could someone offer an example describing how the multi-threaded
scan can cause a deadlock?

Best,
Jorge




On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney <wesmck...@gmail.com> wrote:

> hi Jacob,
>
> The approach taken in Julia strikes me as being motivated by the same
> problems that we have in this project. It would be interesting if
> partr could be used as the basis of our nested parallelism runtime.
> How does Julia handle IO calls within spawned tasks? In other words,
> if we have a function like:
>
> void MyTask() {
>   DoCPUWork();
>   DoSomeIO();
>   DoMoreCPUWork();
>   DoAdditionalIO();
> }
>
> (or maybe you just aren't supposed to do that)
>
> The biggest question would be the C++ programming model (in other
> words, how we have to change our approach to writing code) that we use
> throughout the Arrow libraries. What I'm getting at is to figure out
> how to minimize the amount of code that needs to be significantly
> altered to fit in with the new approach to work scheduling. For
> example, it doesn't strike me that the API that we are using to
> parallelize reading Parquet files at the column level is going to work
> because there are various IO calls within the tasks that are being
> submitted to the thread pool
>
>
> https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875
>
> - Wes
>
> On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <quinn.jac...@gmail.com>
> wrote:
> >
> > My immediate thought reading the discussion points was Julia's task-based
> > multithreading model that has been part of the language for over a year
> > now. An announcement blogpost for Julia 1.3 laid out some of the details
> > and high-level approach:
> https://julialang.org/blog/2019/07/multithreading/,
> > and the multithreading code was marked stable in the recent 1.5 release.
> >
> > Kiran, one of the main contributors to the threading model in Julia,
> worked
> > on a separate C-based repo for the core functionality (
> > https://github.com/kpamnany/partr), but I think the latest code is
> embedded
> > in the Julia source code now.
> >
> > Anyway, probably most useful as a reference, but Jameson (cc'd) also does
> > weekly multithreading chats (on Wednesdays), so I imagine he wouldn't
> mind
> > chatting about things if desired.
> >
> > -Jacob
> >
> > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <weston.p...@gmail.com>
> wrote:
> >
> > > My C++ is pretty rusty but I'll see if I can come up with a concrete
> > > CSV example / experiment / proof of concept on Friday when I have a
> > > break from work.
> > >
> > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <wesmck...@gmail.com>
> wrote:
> > > >
> > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace <weston.p...@gmail.com>
> > > wrote:
> > > > >
> > > > > Yes.  Thank you.  I am in agreement with you and futures/callbacks
> are
> > > > > one such "richer programming model for
> > > > > hierarchical work scheduling".
> > > > >
> > > > > A scan task with a naive approach is:
> > > > >
> > > > >     workers = partition_files_list(files_list)
> > > > >     for worker in workers:
> > > > >         start_thread(worker)
> > > > >     for worker in workers:
> > > > >         join_thread(worker)
> > > > >     return aggregate_results()
> > > > >
> > > > > You have N+1 threads because you have N worker threads and 1 scan
> > > > > thread.  There is the potential for deadlock if your thread pool
> only
> > > > > has one remaining spot and it is given to the scan thread.
> > > > >
> > > > > On the other hand, with a futures based approach you have:
> > > > >
> > > > > futures = partition_files_list(files_list)
> > > > > return when_all(futures).do(aggregate_results)
> > > > >
> > > > > There are only N threads.  The scan thread goes away.  In fact, if
> all
> > > > > of your underlying OS/FS libraries are non-blocking then you can
> > > > > completely eliminate threads in the waiting state and an entire
> > > > > category of deadlocks are no longer a possibility.
> > > >
> > > > I don't quite follow. I think it would be most helpful to focus on a
> > > > concrete practical matter like reading Parquet or CSV files in
> > > > parallel (which can be go faster through parallelism at the single
> > > > file level) and devise a programming model in C++ that is different
> > > > from what we are currently doing that results in superior CPU
> > > > utilization.
> > > >
> > > >
> > > > >
> > > > > -Weston
> > > > >
> > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney <wesmck...@gmail.com>
> > > wrote:
> > > > > >
> > > > > > hi Weston,
> > > > > >
> > > > > > We've discussed some of these problems in the past -- I was
> > > > > > enumerating some of these issues to highlight the problems that
> are
> > > > > > resulting from an absence of a richer programming model for
> > > > > > hierarchical work scheduling. Parallel tasks originating in each
> > > > > > workload are submitted to a global thread pool where they are
> > > > > > commingled with the tasks coming from other workloads.
> > > > > >
> > > > > > As an example of how this can go wrong, suppose we have a static
> > > > > > thread pool with 4 executors. If we submit 4 long-running tasks
> to
> > > the
> > > > > > pool, and then each of these tasks spawn additional tasks that go
> > > into
> > > > > > the thread pool, a deadlock can occur, because the thread pool
> thinks
> > > > > > that it's executing tasks when in fact those tasks are waiting on
> > > > > > their dependent tasks to complete.
> > > > > >
> > > > > > A similar resource underutilization occurs when we do
> > > > > > pool->Submit(ReadFile), where ReadFile needs to do some IO --
> from
> > > the
> > > > > > thread pool's perspective, the task is "working" even though it
> may
> > > > > > wait for one or more IO calls to complete.
> > > > > >
> > > > > > In the Datasets API in C++ we have both of these problems: file
> scan
> > > > > > tasks are being pushed onto the global thread pool, and so to
> prevent
> > > > > > deadlocks multithreaded file parsing has been disabled.
> Additionally,
> > > > > > the scan tasks do IO, resulting in suboptimal performance (the
> > > > > > problems caused by this will be especially exacerbated when
> running
> > > > > > against slower filesystems like Amazon S3)
> > > > > >
> > > > > > Hopefully the issues are more clear.
> > > > > >
> > > > > > Thanks
> > > > > > Wes
> > > > > >
> > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace <
> weston.p...@gmail.com>
> > > wrote:
> > > > > > >
> > > > > > > It sounds like you are describing two problems.
> > > > > > >
> > > > > > > 1) Idleness - Tasks are holding threads in the thread pool
> while
> > > they
> > > > > > > wait for IO or some long running non-CPU task to complete.
> These
> > > > > > > threads are often in a "wait" state or something similar.
> > > > > > > 2) Fairness - The ordering of tasks is causing short tasks that
> > > could
> > > > > > > be completed quickly from being stuck behind longer term tasks.
> > > > > > > Fairness can be an issue even if all tasks are always in the
> active
> > > > > > > state consuming CPU time.
> > > > > > >
> > > > > > > Are both of these issues a problem?  Are you looking to address
> > > both of them?
> > > > > > >
> > > > > > > I doubt it's much help as it is probably a more substantial
> change
> > > > > > > than what you were looking for but the popular solution to #1
> these
> > > > > > > days seems to be moving toward non blocking IO with
> > > > > > > promises/callbacks/async.  That way threads are never in the
> > > waiting
> > > > > > > state (unless sitting idle in the pool).
> > > > > > >
> > > > > > > -Weston
> > > > > > >
> > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney <
> wesmck...@gmail.com>
> > > wrote:
> > > > > > > >
> > > > > > > > In light of ARROW-9924, I wanted to rekindle the discussion
> > > about our
> > > > > > > > approach to multithreading (especially the _programming
> model_)
> > > in
> > > > > > > > C++. We had some discussions about this about 6 months ago
> and
> > > there
> > > > > > > > were more discussions as I recall in summer 2019.
> > > > > > > >
> > > > > > > > Realistically, we are going to be consistently dealing with
> > > > > > > > independent concurrent in-process workloads that each
> > > respectively can
> > > > > > > > go faster by multithreading. These could be things like:
> > > > > > > >
> > > > > > > > * Reading file formats (CSV, Parquet, etc.) that benefit from
> > > > > > > > multithreaded parsing/decoding
> > > > > > > > * Reading one or more files in parallel using the Datasets
> API
> > > > > > > > * Executing any number of multithreaded analytical workloads
> > > > > > > >
> > > > > > > > One obvious issue with our thread scheduling is the FIFO
> nature
> > > of the
> > > > > > > > global thread pool. If a new independent multithreaded
> workload
> > > shows
> > > > > > > > up, it has to wait for other workloads to complete before the
> > > new work
> > > > > > > > will be scheduled. Think about a Flight server serving
> queries to
> > > > > > > > users -- is it fair for one query to "hog" the thread pool
> and
> > > force
> > > > > > > > other requests to wait until they can get access to some CPU
> > > > > > > > resources? You could imagine a workload that spawns 10
> minutes
> > > worth
> > > > > > > > of CPU work, where a new workload has to wait for all of that
> > > work to
> > > > > > > > complete before having any tasks scheduled for execution.
> > > > > > > >
> > > > > > > > The approach that's been taken in the Datasets API to avoid
> > > problems
> > > > > > > > with nested parallelism (file-specific operations spawning
> > > multiple
> > > > > > > > tasks onto the global thread pool) is simply to disable
> > > multithreading
> > > > > > > > at the level of a single file. This is clearly suboptimal.
> > > > > > > >
> > > > > > > > We have additional problems in that some file-loading related
> > > tasks do
> > > > > > > > a mixture of CPU work and IO work, and once a thread has been
> > > > > > > > dispatched to execute one of these tasks, when IO takes
> place, a
> > > CPU
> > > > > > > > core may sit underutilized while the IO is waiting.
> > > > > > > >
> > > > > > > > There's more aspects we can discuss, but in general I think
> we
> > > need to
> > > > > > > > come up with a programming model for building our C++ system
> > > > > > > > components with the following requirements:
> > > > > > > >
> > > > > > > > * Deadlocks not possible by design
> > > > > > > > * Any component can safely use "nested parallelism" without
> the
> > > > > > > > programmer having to worry about deadlocks or one task
> "hogging"
> > > the
> > > > > > > > thread pool. So in other words, if there's only a single
> > > > > > > > multithreading-capable workload running, we "let it rip"
> > > > > > > > * Resources can be reasonably fairly allocated amongst
> concurrent
> > > > > > > > workloads (think: independent requests coming in through
> Flight,
> > > or
> > > > > > > > scan tasks on different Parquet files in the Datasets API).
> Limit
> > > > > > > > scenarios where a new workload is blocked altogether on the
> > > completion
> > > > > > > > of other workloads
> > > > > > > > * A well-defined programming pattern for tasks that do a
> mixture
> > > of
> > > > > > > > CPU work and IO work that allows CPU cores to be used when a
> > > task is
> > > > > > > > waiting on IO
> > > > > > > >
> > > > > > > > We can't be the only project that has these problems, so I'm
> > > > > > > > interested to see what solutions have been successfully
> employed
> > > by
> > > > > > > > others. For example, it strikes me as similar to concurrency
> > > issues
> > > > > > > > inside an analytic database. How are they preventing
> concurrent
> > > > > > > > workload starvation problems or handling CPU/IO task
> scheduling
> > > to
> > > > > > > > avoid CPU underutilization?
> > > > > > > >
> > > > > > > > Choices of which threading libraries we might use to
> implement a
> > > > > > > > viable solution (e.g. TBB) seem secondary to the programming
> > > model
> > > > > > > > that we use to implement our components.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Wes
> > >
>

Reply via email to