I took a look at https://github.com/kpamnany/partr and Julia's production iteration of that -- kpamnany/partr depends on libconcurrent's coroutine implementation which does not work on Windows. It appears that Julia is using libuv instead. If we're looking for a lighter-weight C coroutine implementation, there is http://software.schmorp.de/pkg/libcoro.html, but either way there is quite a bit of systems work to create something that can work for Arrow.
I don't have an intuition whether depth-first scheduling (what Julia is doing) or breadth-first scheduling (aka "work stealing" -- which is what Intel's TBB library does [1]) will work better for our use cases. But I believe that we need to figure out a programming model (probably based on composable futures and continuations given what we are already doing) that hides the details of which coroutine/threading runtime. A follow-on project would likely be to define a non-blocking API for our various IO interfaces that composes with the rest of the thread scheduling machinery. Either way, this problem is definitely non-trivial so we should figure out what "default" approach we can implement that is compatible with our "minimal dependency core build" approach in C++ (which may involve vendoring some third party code, but not sure if vendoring TBB is a good idea) and go and do that. If anyone would like to be funded to work on this problem, please get in touch with me offline. Thanks Wes [1]: https://software.intel.com/content/www/us/en/develop/blogs/the-work-isolation-functionality-in-intel-threading-building-blocks-intel-tbb.html On Sat, Sep 19, 2020 at 5:21 PM Weston Pace <weston.p...@gmail.com> wrote: > > Ok, my skill with C++ got in the way of my ability to put something > together. First, I did not realize that C++ futures were a little > different than the definition I'm used to for futures. By default, > C++ futures are not composable, you can't add continuations with > `then`, `when_all` or `when_any`. There is an extension for this (not > sure if it will make it even in C++20) and there are continuations for > futures in boost's futures. However, since arrow is currently using > its own future implementation I could not use either of these > libraries. I spent a bit trying to add continuations to arrow's > future implementation but my lack of skill with C++ got in the way. I > want to keep working on it but it may be a few days. In the meantime > I will try and type up something more complete (with a few diagrams) > to explain what I'm intending. > > Having looked at the code for a while I do have a better sense of what > is involved. I think it would be a pretty extensive set of changes. > Also, it looks like C++20 is planning on adopting co-routines which > they will be using for sequential async. So perhaps it makes more > sense to go directly to coroutines instead of moving to composable > futures and then later to coroutines at some point in the future. > > Also, re: Julia, I looked into it a bit further and Julia is using > libuv under the hood for all file I/O (which is non-blocking I/O). > Also async/await are built into the bones of Julia. As far as I can > tell from my brief examination is that there is no way to have a Julia > task that is performing blocking I/O (in the sense that a "thread pool > thread" is blocked on I/O. You can have blocking I/O in the > async/await sense where you are awaiting on I/O to maintain sequential > semantics. > > On Wed, Sep 16, 2020 at 8:10 AM Weston Pace <weston.p...@gmail.com> wrote: > > > > If you want to specifically look at the problem of dataset scanning, > > file scanning, and nested parallelism then probably the lowest effort > > improvement would be to eliminate the whole idea of "scan threads". > > You currently have... > > > > for (size_t i = 0; i < readers.size(); ++i) { > > ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i)); > > } > > Status final_status; > > for (auto& fut : futures) { > > final_status &= fut.status(); > > } > > // Hiding some follow-up aggregation and the next line is a bit > > abbreviated > > return Validate(); > > > > You're already using futures so it would be pretty straightforward to > > change that to > > > > for (size_t i = 0; i < readers.size(); ++i) { > > ARROW_ASSIGN_OR_RAISE(futures[i], pool->Submit(ReadColumnFunc, i)); > > } > > // Hiding some follow-up aggregation and the next line is a bit > > abbreviated > > return > > std::experimental::when_all(futures).then(FollowUpAggregation).then(Validate); > > > > Dataset scans are currently using a threaded task group. Those would > > change to std::experimental::when_all instead. So now the dataset > > scan is not creating N threads but again just returning a composed > > future. So if you have one dataset scan across 4 files and each file > > kicks off 10 column reader tasks then you have 40 "threads" submitted > > to your thread pool and the main calling thread waiting on the future. > > All of these thread pool threads are inner worker threads. None of > > these thread pool threads have to wait on other threads. There is no > > possibility of deadlock. > > > > You can do this at each level of nesting so that only your inner most > > worker threads are actually calling `pool->Submit`. There is then > > just one outer main thread (presumably not a thread pool thread) that > > is waiting on the future. It's not a super small change because now > > FileReaderImpl::ReadRowGroups returns a future. That would have to > > propagate all the way up so that your dataset scan itself is returning > > a future (you can safely synchronize it at this point so your public > > API remains synchronous because no public API call is going to be > > arriving on a thread pool thread). > > > > That at least solves the deadlock problem. It also starts to > > propagate futures throughout the code base which could be good or bad > > depending on your view of such things. It does not solve the > > under-utilization problem because you still have threads sitting in > > the thread pool waiting on blocking I/O. > > > > The next step would be to move to non-blocking I/O. At this point you > > have quite a few choices. > > > > On Wed, Sep 16, 2020 at 7:26 AM Wes McKinney <wesmck...@gmail.com> wrote: > > > > > > On Wed, Sep 16, 2020 at 10:31 AM Jorge Cardoso Leitão > > > <jorgecarlei...@gmail.com> wrote: > > > > > > > > Hi, > > > > > > > > I am not sure I fully understand, so I will try to give an example to > > > > check: we have a simple query that we want to write the result to some > > > > place: > > > > > > > > SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a > > > > > > > > At the physical plane, we need to > > > > > > > > 1. read each file in batches > > > > 2. join the batches > > > > 3. iterate over results and write them in partitions > > > > > > > > In principle, we can multi-thread them > > > > > > > > 1. multi-threaded scan > > > > 2. multi-threaded hash join (e.g. with a shared map) > > > > 3. multi-threaded write (e.g. 1 file per partition) > > > > > > > > The issue is that when we schedule this, the physical nodes themselves > > > > control how they perform their own operations, and there is no > > > > orchestration as to what resources are available and what should be > > > > prioritized. Consequently, we may have a scan of table t1 that is > > > > running > > > > with 12 threads, while the scan of table t2 is waiting for a thread to > > > > be > > > > available. This causes the computation to stall as both are required for > > > > step 2 to proceed. OTOH, if we have no multithreaded scans, then > > > > multithreading seldom helps, as we are bottlenecked by the scans' > > > > throughput. Is this the gist of the problem? > > > > > > > > If yes: the core issue here seems to be that there is no orchestrator to > > > > re-prioritize CPU to where it is needed (the scan of t2 in the example > > > > above), because each physical node has a thread.join that is not > > > > coordinated with their downstream dependencies (and so on). Isn't this a > > > > natural candidate for futures/async? We seem to need some coordination > > > > across the DAG. > > > > > > > > If not: could someone offer an example describing how the multi-threaded > > > > scan can cause a deadlock? > > > > > > Suppose that we have 4 large CSV files in Amazon S3 and a static > > > thread pool with 4 threads. If we use the thread pool to execute scan > > > tasks for all 4 files in parallel, then if any of those scan tasks > > > internally try to spawn tasks in the same thread pool (before other > > > tasks have finished) to parallelize some of their computational work > > > -- i.e. "nested parallelism" is what we call this -- then you have a > > > deadlock because our current thread pool implementation cannot > > > distinguish between task interdependencies / does not understand > > > nested parallelism. > > > > > > > Best, > > > > Jorge > > > > > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney <wesmck...@gmail.com> > > > > wrote: > > > > > > > > > hi Jacob, > > > > > > > > > > The approach taken in Julia strikes me as being motivated by the same > > > > > problems that we have in this project. It would be interesting if > > > > > partr could be used as the basis of our nested parallelism runtime. > > > > > How does Julia handle IO calls within spawned tasks? In other words, > > > > > if we have a function like: > > > > > > > > > > void MyTask() { > > > > > DoCPUWork(); > > > > > DoSomeIO(); > > > > > DoMoreCPUWork(); > > > > > DoAdditionalIO(); > > > > > } > > > > > > > > > > (or maybe you just aren't supposed to do that) > > > > > > > > > > The biggest question would be the C++ programming model (in other > > > > > words, how we have to change our approach to writing code) that we use > > > > > throughout the Arrow libraries. What I'm getting at is to figure out > > > > > how to minimize the amount of code that needs to be significantly > > > > > altered to fit in with the new approach to work scheduling. For > > > > > example, it doesn't strike me that the API that we are using to > > > > > parallelize reading Parquet files at the column level is going to work > > > > > because there are various IO calls within the tasks that are being > > > > > submitted to the thread pool > > > > > > > > > > > > > > > https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875 > > > > > > > > > > - Wes > > > > > > > > > > On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <quinn.jac...@gmail.com> > > > > > wrote: > > > > > > > > > > > > My immediate thought reading the discussion points was Julia's > > > > > > task-based > > > > > > multithreading model that has been part of the language for over a > > > > > > year > > > > > > now. An announcement blogpost for Julia 1.3 laid out some of the > > > > > > details > > > > > > and high-level approach: > > > > > https://julialang.org/blog/2019/07/multithreading/, > > > > > > and the multithreading code was marked stable in the recent 1.5 > > > > > > release. > > > > > > > > > > > > Kiran, one of the main contributors to the threading model in Julia, > > > > > worked > > > > > > on a separate C-based repo for the core functionality ( > > > > > > https://github.com/kpamnany/partr), but I think the latest code is > > > > > embedded > > > > > > in the Julia source code now. > > > > > > > > > > > > Anyway, probably most useful as a reference, but Jameson (cc'd) > > > > > > also does > > > > > > weekly multithreading chats (on Wednesdays), so I imagine he > > > > > > wouldn't > > > > > mind > > > > > > chatting about things if desired. > > > > > > > > > > > > -Jacob > > > > > > > > > > > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <weston.p...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > My C++ is pretty rusty but I'll see if I can come up with a > > > > > > > concrete > > > > > > > CSV example / experiment / proof of concept on Friday when I have > > > > > > > a > > > > > > > break from work. > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <wesmck...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace > > > > > > > > <weston.p...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > Yes. Thank you. I am in agreement with you and > > > > > > > > > futures/callbacks > > > > > are > > > > > > > > > one such "richer programming model for > > > > > > > > > hierarchical work scheduling". > > > > > > > > > > > > > > > > > > A scan task with a naive approach is: > > > > > > > > > > > > > > > > > > workers = partition_files_list(files_list) > > > > > > > > > for worker in workers: > > > > > > > > > start_thread(worker) > > > > > > > > > for worker in workers: > > > > > > > > > join_thread(worker) > > > > > > > > > return aggregate_results() > > > > > > > > > > > > > > > > > > You have N+1 threads because you have N worker threads and 1 > > > > > > > > > scan > > > > > > > > > thread. There is the potential for deadlock if your thread > > > > > > > > > pool > > > > > only > > > > > > > > > has one remaining spot and it is given to the scan thread. > > > > > > > > > > > > > > > > > > On the other hand, with a futures based approach you have: > > > > > > > > > > > > > > > > > > futures = partition_files_list(files_list) > > > > > > > > > return when_all(futures).do(aggregate_results) > > > > > > > > > > > > > > > > > > There are only N threads. The scan thread goes away. In > > > > > > > > > fact, if > > > > > all > > > > > > > > > of your underlying OS/FS libraries are non-blocking then you > > > > > > > > > can > > > > > > > > > completely eliminate threads in the waiting state and an > > > > > > > > > entire > > > > > > > > > category of deadlocks are no longer a possibility. > > > > > > > > > > > > > > > > I don't quite follow. I think it would be most helpful to focus > > > > > > > > on a > > > > > > > > concrete practical matter like reading Parquet or CSV files in > > > > > > > > parallel (which can be go faster through parallelism at the > > > > > > > > single > > > > > > > > file level) and devise a programming model in C++ that is > > > > > > > > different > > > > > > > > from what we are currently doing that results in superior CPU > > > > > > > > utilization. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Weston > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney > > > > > > > > > <wesmck...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > hi Weston, > > > > > > > > > > > > > > > > > > > > We've discussed some of these problems in the past -- I was > > > > > > > > > > enumerating some of these issues to highlight the problems > > > > > > > > > > that > > > > > are > > > > > > > > > > resulting from an absence of a richer programming model for > > > > > > > > > > hierarchical work scheduling. Parallel tasks originating in > > > > > > > > > > each > > > > > > > > > > workload are submitted to a global thread pool where they > > > > > > > > > > are > > > > > > > > > > commingled with the tasks coming from other workloads. > > > > > > > > > > > > > > > > > > > > As an example of how this can go wrong, suppose we have a > > > > > > > > > > static > > > > > > > > > > thread pool with 4 executors. If we submit 4 long-running > > > > > > > > > > tasks > > > > > to > > > > > > > the > > > > > > > > > > pool, and then each of these tasks spawn additional tasks > > > > > > > > > > that go > > > > > > > into > > > > > > > > > > the thread pool, a deadlock can occur, because the thread > > > > > > > > > > pool > > > > > thinks > > > > > > > > > > that it's executing tasks when in fact those tasks are > > > > > > > > > > waiting on > > > > > > > > > > their dependent tasks to complete. > > > > > > > > > > > > > > > > > > > > A similar resource underutilization occurs when we do > > > > > > > > > > pool->Submit(ReadFile), where ReadFile needs to do some IO > > > > > > > > > > -- > > > > > from > > > > > > > the > > > > > > > > > > thread pool's perspective, the task is "working" even > > > > > > > > > > though it > > > > > may > > > > > > > > > > wait for one or more IO calls to complete. > > > > > > > > > > > > > > > > > > > > In the Datasets API in C++ we have both of these problems: > > > > > > > > > > file > > > > > scan > > > > > > > > > > tasks are being pushed onto the global thread pool, and so > > > > > > > > > > to > > > > > prevent > > > > > > > > > > deadlocks multithreaded file parsing has been disabled. > > > > > Additionally, > > > > > > > > > > the scan tasks do IO, resulting in suboptimal performance > > > > > > > > > > (the > > > > > > > > > > problems caused by this will be especially exacerbated when > > > > > running > > > > > > > > > > against slower filesystems like Amazon S3) > > > > > > > > > > > > > > > > > > > > Hopefully the issues are more clear. > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > Wes > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace < > > > > > weston.p...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > It sounds like you are describing two problems. > > > > > > > > > > > > > > > > > > > > > > 1) Idleness - Tasks are holding threads in the thread pool > > > > > while > > > > > > > they > > > > > > > > > > > wait for IO or some long running non-CPU task to complete. > > > > > These > > > > > > > > > > > threads are often in a "wait" state or something similar. > > > > > > > > > > > 2) Fairness - The ordering of tasks is causing short > > > > > > > > > > > tasks that > > > > > > > could > > > > > > > > > > > be completed quickly from being stuck behind longer term > > > > > > > > > > > tasks. > > > > > > > > > > > Fairness can be an issue even if all tasks are always in > > > > > > > > > > > the > > > > > active > > > > > > > > > > > state consuming CPU time. > > > > > > > > > > > > > > > > > > > > > > Are both of these issues a problem? Are you looking to > > > > > > > > > > > address > > > > > > > both of them? > > > > > > > > > > > > > > > > > > > > > > I doubt it's much help as it is probably a more > > > > > > > > > > > substantial > > > > > change > > > > > > > > > > > than what you were looking for but the popular solution > > > > > > > > > > > to #1 > > > > > these > > > > > > > > > > > days seems to be moving toward non blocking IO with > > > > > > > > > > > promises/callbacks/async. That way threads are never in > > > > > > > > > > > the > > > > > > > waiting > > > > > > > > > > > state (unless sitting idle in the pool). > > > > > > > > > > > > > > > > > > > > > > -Weston > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney < > > > > > wesmck...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle the > > > > > > > > > > > > discussion > > > > > > > about our > > > > > > > > > > > > approach to multithreading (especially the _programming > > > > > model_) > > > > > > > in > > > > > > > > > > > > C++. We had some discussions about this about 6 months > > > > > > > > > > > > ago > > > > > and > > > > > > > there > > > > > > > > > > > > were more discussions as I recall in summer 2019. > > > > > > > > > > > > > > > > > > > > > > > > Realistically, we are going to be consistently dealing > > > > > > > > > > > > with > > > > > > > > > > > > independent concurrent in-process workloads that each > > > > > > > respectively can > > > > > > > > > > > > go faster by multithreading. These could be things like: > > > > > > > > > > > > > > > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) that > > > > > > > > > > > > benefit from > > > > > > > > > > > > multithreaded parsing/decoding > > > > > > > > > > > > * Reading one or more files in parallel using the > > > > > > > > > > > > Datasets > > > > > API > > > > > > > > > > > > * Executing any number of multithreaded analytical > > > > > > > > > > > > workloads > > > > > > > > > > > > > > > > > > > > > > > > One obvious issue with our thread scheduling is the FIFO > > > > > nature > > > > > > > of the > > > > > > > > > > > > global thread pool. If a new independent multithreaded > > > > > workload > > > > > > > shows > > > > > > > > > > > > up, it has to wait for other workloads to complete > > > > > > > > > > > > before the > > > > > > > new work > > > > > > > > > > > > will be scheduled. Think about a Flight server serving > > > > > queries to > > > > > > > > > > > > users -- is it fair for one query to "hog" the thread > > > > > > > > > > > > pool > > > > > and > > > > > > > force > > > > > > > > > > > > other requests to wait until they can get access to > > > > > > > > > > > > some CPU > > > > > > > > > > > > resources? You could imagine a workload that spawns 10 > > > > > minutes > > > > > > > worth > > > > > > > > > > > > of CPU work, where a new workload has to wait for all > > > > > > > > > > > > of that > > > > > > > work to > > > > > > > > > > > > complete before having any tasks scheduled for > > > > > > > > > > > > execution. > > > > > > > > > > > > > > > > > > > > > > > > The approach that's been taken in the Datasets API to > > > > > > > > > > > > avoid > > > > > > > problems > > > > > > > > > > > > with nested parallelism (file-specific operations > > > > > > > > > > > > spawning > > > > > > > multiple > > > > > > > > > > > > tasks onto the global thread pool) is simply to disable > > > > > > > multithreading > > > > > > > > > > > > at the level of a single file. This is clearly > > > > > > > > > > > > suboptimal. > > > > > > > > > > > > > > > > > > > > > > > > We have additional problems in that some file-loading > > > > > > > > > > > > related > > > > > > > tasks do > > > > > > > > > > > > a mixture of CPU work and IO work, and once a thread > > > > > > > > > > > > has been > > > > > > > > > > > > dispatched to execute one of these tasks, when IO takes > > > > > place, a > > > > > > > CPU > > > > > > > > > > > > core may sit underutilized while the IO is waiting. > > > > > > > > > > > > > > > > > > > > > > > > There's more aspects we can discuss, but in general I > > > > > > > > > > > > think > > > > > we > > > > > > > need to > > > > > > > > > > > > come up with a programming model for building our C++ > > > > > > > > > > > > system > > > > > > > > > > > > components with the following requirements: > > > > > > > > > > > > > > > > > > > > > > > > * Deadlocks not possible by design > > > > > > > > > > > > * Any component can safely use "nested parallelism" > > > > > > > > > > > > without > > > > > the > > > > > > > > > > > > programmer having to worry about deadlocks or one task > > > > > "hogging" > > > > > > > the > > > > > > > > > > > > thread pool. So in other words, if there's only a single > > > > > > > > > > > > multithreading-capable workload running, we "let it rip" > > > > > > > > > > > > * Resources can be reasonably fairly allocated amongst > > > > > concurrent > > > > > > > > > > > > workloads (think: independent requests coming in through > > > > > Flight, > > > > > > > or > > > > > > > > > > > > scan tasks on different Parquet files in the Datasets > > > > > > > > > > > > API). > > > > > Limit > > > > > > > > > > > > scenarios where a new workload is blocked altogether on > > > > > > > > > > > > the > > > > > > > completion > > > > > > > > > > > > of other workloads > > > > > > > > > > > > * A well-defined programming pattern for tasks that do a > > > > > mixture > > > > > > > of > > > > > > > > > > > > CPU work and IO work that allows CPU cores to be used > > > > > > > > > > > > when a > > > > > > > task is > > > > > > > > > > > > waiting on IO > > > > > > > > > > > > > > > > > > > > > > > > We can't be the only project that has these problems, > > > > > > > > > > > > so I'm > > > > > > > > > > > > interested to see what solutions have been successfully > > > > > employed > > > > > > > by > > > > > > > > > > > > others. For example, it strikes me as similar to > > > > > > > > > > > > concurrency > > > > > > > issues > > > > > > > > > > > > inside an analytic database. How are they preventing > > > > > concurrent > > > > > > > > > > > > workload starvation problems or handling CPU/IO task > > > > > scheduling > > > > > > > to > > > > > > > > > > > > avoid CPU underutilization? > > > > > > > > > > > > > > > > > > > > > > > > Choices of which threading libraries we might use to > > > > > implement a > > > > > > > > > > > > viable solution (e.g. TBB) seem secondary to the > > > > > > > > > > > > programming > > > > > > > model > > > > > > > > > > > > that we use to implement our components. > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > Wes > > > > > > > > > > > >