hi Jacob, The approach taken in Julia strikes me as being motivated by the same problems that we have in this project. It would be interesting if partr could be used as the basis of our nested parallelism runtime. How does Julia handle IO calls within spawned tasks? In other words, if we have a function like:
void MyTask() { DoCPUWork(); DoSomeIO(); DoMoreCPUWork(); DoAdditionalIO(); } (or maybe you just aren't supposed to do that) The biggest question would be the C++ programming model (in other words, how we have to change our approach to writing code) that we use throughout the Arrow libraries. What I'm getting at is to figure out how to minimize the amount of code that needs to be significantly altered to fit in with the new approach to work scheduling. For example, it doesn't strike me that the API that we are using to parallelize reading Parquet files at the column level is going to work because there are various IO calls within the tasks that are being submitted to the thread pool https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875 - Wes On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <quinn.jac...@gmail.com> wrote: > > My immediate thought reading the discussion points was Julia's task-based > multithreading model that has been part of the language for over a year > now. An announcement blogpost for Julia 1.3 laid out some of the details > and high-level approach: https://julialang.org/blog/2019/07/multithreading/, > and the multithreading code was marked stable in the recent 1.5 release. > > Kiran, one of the main contributors to the threading model in Julia, worked > on a separate C-based repo for the core functionality ( > https://github.com/kpamnany/partr), but I think the latest code is embedded > in the Julia source code now. > > Anyway, probably most useful as a reference, but Jameson (cc'd) also does > weekly multithreading chats (on Wednesdays), so I imagine he wouldn't mind > chatting about things if desired. > > -Jacob > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <weston.p...@gmail.com> wrote: > > > My C++ is pretty rusty but I'll see if I can come up with a concrete > > CSV example / experiment / proof of concept on Friday when I have a > > break from work. > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <wesmck...@gmail.com> wrote: > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace <weston.p...@gmail.com> > > wrote: > > > > > > > > Yes. Thank you. I am in agreement with you and futures/callbacks are > > > > one such "richer programming model for > > > > hierarchical work scheduling". > > > > > > > > A scan task with a naive approach is: > > > > > > > > workers = partition_files_list(files_list) > > > > for worker in workers: > > > > start_thread(worker) > > > > for worker in workers: > > > > join_thread(worker) > > > > return aggregate_results() > > > > > > > > You have N+1 threads because you have N worker threads and 1 scan > > > > thread. There is the potential for deadlock if your thread pool only > > > > has one remaining spot and it is given to the scan thread. > > > > > > > > On the other hand, with a futures based approach you have: > > > > > > > > futures = partition_files_list(files_list) > > > > return when_all(futures).do(aggregate_results) > > > > > > > > There are only N threads. The scan thread goes away. In fact, if all > > > > of your underlying OS/FS libraries are non-blocking then you can > > > > completely eliminate threads in the waiting state and an entire > > > > category of deadlocks are no longer a possibility. > > > > > > I don't quite follow. I think it would be most helpful to focus on a > > > concrete practical matter like reading Parquet or CSV files in > > > parallel (which can be go faster through parallelism at the single > > > file level) and devise a programming model in C++ that is different > > > from what we are currently doing that results in superior CPU > > > utilization. > > > > > > > > > > > > > > -Weston > > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney <wesmck...@gmail.com> > > wrote: > > > > > > > > > > hi Weston, > > > > > > > > > > We've discussed some of these problems in the past -- I was > > > > > enumerating some of these issues to highlight the problems that are > > > > > resulting from an absence of a richer programming model for > > > > > hierarchical work scheduling. Parallel tasks originating in each > > > > > workload are submitted to a global thread pool where they are > > > > > commingled with the tasks coming from other workloads. > > > > > > > > > > As an example of how this can go wrong, suppose we have a static > > > > > thread pool with 4 executors. If we submit 4 long-running tasks to > > the > > > > > pool, and then each of these tasks spawn additional tasks that go > > into > > > > > the thread pool, a deadlock can occur, because the thread pool thinks > > > > > that it's executing tasks when in fact those tasks are waiting on > > > > > their dependent tasks to complete. > > > > > > > > > > A similar resource underutilization occurs when we do > > > > > pool->Submit(ReadFile), where ReadFile needs to do some IO -- from > > the > > > > > thread pool's perspective, the task is "working" even though it may > > > > > wait for one or more IO calls to complete. > > > > > > > > > > In the Datasets API in C++ we have both of these problems: file scan > > > > > tasks are being pushed onto the global thread pool, and so to prevent > > > > > deadlocks multithreaded file parsing has been disabled. Additionally, > > > > > the scan tasks do IO, resulting in suboptimal performance (the > > > > > problems caused by this will be especially exacerbated when running > > > > > against slower filesystems like Amazon S3) > > > > > > > > > > Hopefully the issues are more clear. > > > > > > > > > > Thanks > > > > > Wes > > > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace <weston.p...@gmail.com> > > wrote: > > > > > > > > > > > > It sounds like you are describing two problems. > > > > > > > > > > > > 1) Idleness - Tasks are holding threads in the thread pool while > > they > > > > > > wait for IO or some long running non-CPU task to complete. These > > > > > > threads are often in a "wait" state or something similar. > > > > > > 2) Fairness - The ordering of tasks is causing short tasks that > > could > > > > > > be completed quickly from being stuck behind longer term tasks. > > > > > > Fairness can be an issue even if all tasks are always in the active > > > > > > state consuming CPU time. > > > > > > > > > > > > Are both of these issues a problem? Are you looking to address > > both of them? > > > > > > > > > > > > I doubt it's much help as it is probably a more substantial change > > > > > > than what you were looking for but the popular solution to #1 these > > > > > > days seems to be moving toward non blocking IO with > > > > > > promises/callbacks/async. That way threads are never in the > > waiting > > > > > > state (unless sitting idle in the pool). > > > > > > > > > > > > -Weston > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney <wesmck...@gmail.com> > > wrote: > > > > > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle the discussion > > about our > > > > > > > approach to multithreading (especially the _programming model_) > > in > > > > > > > C++. We had some discussions about this about 6 months ago and > > there > > > > > > > were more discussions as I recall in summer 2019. > > > > > > > > > > > > > > Realistically, we are going to be consistently dealing with > > > > > > > independent concurrent in-process workloads that each > > respectively can > > > > > > > go faster by multithreading. These could be things like: > > > > > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) that benefit from > > > > > > > multithreaded parsing/decoding > > > > > > > * Reading one or more files in parallel using the Datasets API > > > > > > > * Executing any number of multithreaded analytical workloads > > > > > > > > > > > > > > One obvious issue with our thread scheduling is the FIFO nature > > of the > > > > > > > global thread pool. If a new independent multithreaded workload > > shows > > > > > > > up, it has to wait for other workloads to complete before the > > new work > > > > > > > will be scheduled. Think about a Flight server serving queries to > > > > > > > users -- is it fair for one query to "hog" the thread pool and > > force > > > > > > > other requests to wait until they can get access to some CPU > > > > > > > resources? You could imagine a workload that spawns 10 minutes > > worth > > > > > > > of CPU work, where a new workload has to wait for all of that > > work to > > > > > > > complete before having any tasks scheduled for execution. > > > > > > > > > > > > > > The approach that's been taken in the Datasets API to avoid > > problems > > > > > > > with nested parallelism (file-specific operations spawning > > multiple > > > > > > > tasks onto the global thread pool) is simply to disable > > multithreading > > > > > > > at the level of a single file. This is clearly suboptimal. > > > > > > > > > > > > > > We have additional problems in that some file-loading related > > tasks do > > > > > > > a mixture of CPU work and IO work, and once a thread has been > > > > > > > dispatched to execute one of these tasks, when IO takes place, a > > CPU > > > > > > > core may sit underutilized while the IO is waiting. > > > > > > > > > > > > > > There's more aspects we can discuss, but in general I think we > > need to > > > > > > > come up with a programming model for building our C++ system > > > > > > > components with the following requirements: > > > > > > > > > > > > > > * Deadlocks not possible by design > > > > > > > * Any component can safely use "nested parallelism" without the > > > > > > > programmer having to worry about deadlocks or one task "hogging" > > the > > > > > > > thread pool. So in other words, if there's only a single > > > > > > > multithreading-capable workload running, we "let it rip" > > > > > > > * Resources can be reasonably fairly allocated amongst concurrent > > > > > > > workloads (think: independent requests coming in through Flight, > > or > > > > > > > scan tasks on different Parquet files in the Datasets API). Limit > > > > > > > scenarios where a new workload is blocked altogether on the > > completion > > > > > > > of other workloads > > > > > > > * A well-defined programming pattern for tasks that do a mixture > > of > > > > > > > CPU work and IO work that allows CPU cores to be used when a > > task is > > > > > > > waiting on IO > > > > > > > > > > > > > > We can't be the only project that has these problems, so I'm > > > > > > > interested to see what solutions have been successfully employed > > by > > > > > > > others. For example, it strikes me as similar to concurrency > > issues > > > > > > > inside an analytic database. How are they preventing concurrent > > > > > > > workload starvation problems or handling CPU/IO task scheduling > > to > > > > > > > avoid CPU underutilization? > > > > > > > > > > > > > > Choices of which threading libraries we might use to implement a > > > > > > > viable solution (e.g. TBB) seem secondary to the programming > > model > > > > > > > that we use to implement our components. > > > > > > > > > > > > > > Thanks, > > > > > > > Wes > >