So this may be a return to the details, I think the larger discussion is a good discussion to have but I don't know enough of the code base to comment further.
I finished playing around with the CSV reader. The code for this experiment can be found here (https://github.com/westonpace/arrow/tree/feature/composable-futures), it is pretty rough as I was just trying to get things working well enough to run some experiments. In particular the futures stuff is not fleshed out well and does not handle invalid statuses correctly. Most of the experiment is in nested-read-table-deadlock-example.cc. # Key Observations * To keep I/O waits off the thread pool you can use a dedicated thread for I/O instead of moving to non-blocking I/O (Matthias did mention this and the CSV reader was already doing this somewhat). However, without futures / asynchronous, you can still have a thread pool task wait on the I/O thread to populate the channel with another block of data. So this doesn't prevent I/O on the thread pool all by itself. * Since arrow already has futures and a thread pool it isn't too much additional work to add continuations / promises. Although this may be something of a sunk cost fallacy. * The current CSV reader implementation does not do well with a high latency filesystem, an asynchronous solution can work around this * The current thread pool implementation deadlocks when used in a "nested" case, an asynchronous solution can work around this * Work still needs to be done so that the asynchronous solution works as well as the synchronous multi threaded solution in low latency environments # Description I created 20 CSV files, each which has 1,000,000 rows and 4 columns (1 integral and 3 decimal). The files are each around 63MB. Rather than use the dataset stuff directly I simulated a dataset scan by reading the files in a loop. So this experiment only directly involved the csv reader. The CSV reader supports parallelism in a few places. First, both the serial and threaded CSV readers have a dedicated thread for I/O. So in the serial case, while one thread is computing the results of a chunk, the I/O thread is fetching the next block. The threaded CSV reader will process up to X blocks at once where X is the capacity of the thread pool. Also, when processing a block, the threaded CSV reader will launch a task for converting each column. These conversion tasks may fail and need to be rerun. So it is possible for there to be more than one task per column per block. My tests ran on an m5.large EC2 instance connected to EBS storage (where the CSV files were stored) so disk latency was pretty low (e.g. compared to something like S3) and there were two dedicated cores available. So my thread pool was of size 2. Converting the CSV columns was fairly processor intensive. Each file took about ~0.5 seconds to process and the experiment ran in about 10 seconds for all 20 files. The experiment was CPU bound and very little time was spent waiting on I/O. The serial implementation performed about 20% worse than the threaded implementation. Top reported the CPU at pretty much full capacity when the threaded implementation was running although this isn't exactly an accurate measurement. The threaded implementation added ~6600 tasks to the thread pool for a rate of about 660 tasks/second which doesn't seem like it would be especially taxing to a thread pool. I then added composability to arrow futures (a very rough implementation that would need considerable polish before being used for anything). Using this I created an asynchronous CSV reader. This CSV reader acted the same as the multithreaded CSV reader but it broke tasks into smaller pieces and used callbacks so that it would not block on I/O and, as an added bonus, could be nested with an asynchronous scan. The asynchronous reader performed about 10% less than the multithreaded reader and about 10% better than the serial reader. I wouldn't expect it to be faster than the threaded reader (since the task was not I/O bound) but I didn't expect it to be slower. Right now my leading theories are: * The asynchronous approach read from all 20 files at the same time instead of one file at a time. It's possible this slowed down the I/O, some sort of throttled filesystem could help if this were the issue. * My implementation of continuations was pretty sloppy and probably had too many copies of various data fields (my currently leading hypothesis) * The asynchronous implementation created nearly twice as many tasks as the synchronous implementation so maybe it was putting stress on the thread pool * There is a fair amount of locking in the threaded CSV reader algorithm, it's possible the asynchronous implementation interacted with this locking less favorably. When I added latency to the I/O the serial and threaded implementations quickly fell behind the asynchronous implementation in terms of performance. At about 6-7ms of latency the task started to become I/O bound. At 10ms of latency the two readers performed the same. At 20ms latency (e.g. S3 from a different region or outside the cloud) the threaded reader was twice as slow as the asynchronous reader. These numbers may be different on a machine with a higher number of cores which is likely to become I/O bound at lower latencies. # Next Steps If people think there is merit in this kind of approach I'd be happy to clean up my continuations API (simplify the API, reduce excess copies, more automated tests) and investigate further why the asynchronous reader is slower than the threaded read and, if I can fix those issues, potentially add it as the default threaded reader. The actual changes would be additions/backwards compatible... * ConcreteFutureImpl objects would be a little bit larger (to keep track of the callbacks) but these objects aren't usually copied anyways (the future is copied and it moves its pointer to the impl). * Then and WhenAll would be added to ThreadPool (e.g. thread_pool->Then(some_future, what_to_do_after)) * I'd extend TaskGroup to support appending promises (in addition to the functions/delegates it supports today) and has a FinishAsync method which returns a promise instead of blocking (the serial and threaded task groups would simply block and return a finished promise). * I'd add an asynchronous peer to ReadaheadIterator (AsyncReadaheadIterator). * I'd create an AsynchronousTableReader (peer to SerialTableReader and ThreadedTableReader) * I'd add ReadAsync to the TableReader interface. The serial and threaded implementations would block and return a finished promise. I'd guess it would take about two months for me to find enough free time blocks to finish all this work. Although it is just the CSV reader (and not the parquet reader) it would put the continuations API into place which could be used to tackle other "potentially I/O bound" tasks (e.g. parquet reader) as well. On Thu, Sep 24, 2020 at 10:36 AM Wes McKinney <wesmck...@gmail.com> wrote: > > Not C++, but I found the discussion about the Rust's tokio project's > scheduler to be interesting / relevant > > https://tokio.rs/blog/2019-10-scheduler > > On Tue, Sep 22, 2020 at 4:54 PM Wes McKinney <wesmck...@gmail.com> wrote: > > > > Thanks for the pointer to CAF. It reminds me a bit of libprocess which > > is a part of Apache Mesos, which also provides the actor model > > > > https://github.com/apache/mesos/tree/master/3rdparty/libprocess > > > > We'll have to determine a solution that is compatible with our > > spectrum of compiler toolchain support (e.g. we are still technically > > supporting gcc 4.8, but may drop it in the future). Things will get > > easier once we can move to C++14, but C++17 is probably not in the > > cards for quite some time. > > > > On Tue, Sep 22, 2020 at 2:18 PM Matthias Vallentin > > <matth...@vallentin.net> wrote: > > > > > > We are building a highly concurrent database for security data with Arrow > > > as data plane (VAST <https://github.com/tenzir/vast>), so I thought I'll > > > share our view on this since we went over pretty much all of the above > > > mentioned questions. I'm not trying to say "you should do it this way" but > > > instead share our journey, in the hope that you can draw some insight from > > > it. > > > > > > It sounds like there are several challenges to be solved, ranging from > > > non-blocking I/O to efficient task-based scheduling - for some notion of > > > task. We found that the actor model > > > <https://docs.tenzir.com/vast/architecture/actor-model/> solved all of > > > these challenges. In particular, we rely on CAF > > > <https://github.com/actor-framework/actor-framework>, the C++ Actor > > > Framework as concrete implementation. The basic abstraction is that an > > > actor, which is effectively a heavy-weight task (100-200 bytes) that can > > > be > > > scheduled in two ways: on a dedicated thread or in a thread pool. The > > > thread pool is driven either by a work-stealing or work-sharing scheduler, > > > based on the deployment environment. Here's how we solve some concrete > > > problems with these abstractions: > > > > > > 1. *Asynchronous I/O*: we have one "detached" filesystem actor that > > > lives in its own thread that does all I/O. All reads and writes (mmap > > > too, > > > but let's take that aside) go through this actor. You'd request a write > > > with a contiguous chunk of data, and get a response back when the > > > operation > > > succeeded. Any actor can interact with the filesystem actor, also > > > actors > > > that are scheduled in the pool. The point of this abstraction is that > > > I/O > > > operations can block, which is why you never want to execute them in > > > the > > > thread pool. Otherwise all other tasks/actors that are scheduled right > > > behind the I/O operation might get stalled. Sure, work-stealing will > > > alleviate this, but not when steals occur frequently. > > > 2. *Concurrent task execution*: if the work can be *overdecomposed* > > > into > > > smaller chunks such that there are more chunks than CPU cores, a thread > > > pool plus scheduler will do a good job at exploiting the available > > > system > > > concurrency. In our use case, we build dozens of indexes in parallel, > > > with > > > one actor being responsible for one column. They all run in parallel > > > and > > > independent of each other, but operate on the same (immutable) data. So > > > there are no data races by design. Once an indexer actor completes a > > > set of > > > record batches, it builds a flatbuffer and ships it to the I/O actor to > > > persist it. At that point we're in point (1), asynchronous I/O. This > > > plays > > > together nicely. > > > 3. *Network transparency*: Since the actor model communication > > > abstraction is message passing, it's easy to hide the actor location, > > > be it > > > in memory or remotely available via IPC. The actor runtime takes care > > > of > > > either just passing a pointer with the message contents or doing the > > > transparent serialization. This makes it very easy to build a > > > distributed > > > system if need be, or run everything in a single process. > > > > > > We could have gone with lower-level abstraction, e.g., thread pool > > > with coroutines, but decided that we get more mileage from an actor > > > runtime. We see coroutines just as the syntactic sugar that make the > > > inversion of control more reasonable to understand through straight-line > > > code, but it's not a new messaging capability in the context of the actor > > > model implementation we use, which allows for arbitrary messaging and > > > synchronization patterns - all fully asynchronous, i.e., non-blocking by > > > yielding to the scheduler. Calling .then(...) on when a message arrives is > > > effectively a future, and we frequently create response promises and > > > message delegation patterns, often implicitly through the runtime simply > > > by > > > returning a value in a lambda. We also challenged the overhead of an actor > > > compared to a light-weight task, but found that even creating millions of > > > actors in parallel in CAF still doesn't cause memory pressure or > > > substantially more cache misses. At the end, we could not find a reason to > > > *not* go with CAF, and we don't regret this choice to date. To date, we > > > work with the experimental credit-based streaming feature of CAF that > > > gives > > > Flink-like streaming semantics though actor-based backpressure. Once > > > again, > > > a single powerful abstraction to pretty much address all our needs in > > > scalable distributed systems that is close to the hardware as well. > > > > > > I hope this helps making better decisions in finding the right > > > abstractions > > > for you. I've used the actor model as a vehicle for my arguments, but > > > there > > > are other isomorphic models and vocabulary (e.g., CSP). The main point I > > > want to get across is that thinking too low-level and single-solution > > > (only > > > threapools, just coroutines, etc.) may result in a local instead of global > > > optimum. > > > > > > Matthias > > > > > > > > > On Mon, Sep 21, 2020 at 9:38 PM Ben Kietzman <b...@ursacomputing.com> > > > wrote: > > > > > > > FWIW boost.coroutine and boost.asio provide composable coroutines, > > > > non blocking IO, and configurable scheduling for CPU work out of the > > > > box. > > > > > > > > The boost libraries are not lightweight but they are robust and > > > > cross platform, so I think asio is worth consideration. > > > > > > > > On Sat, Sep 19, 2020 at 8:22 PM Wes McKinney <wesmck...@gmail.com> > > > > wrote: > > > > > > > > > I took a look at https://github.com/kpamnany/partr and Julia's > > > > > production iteration of that -- kpamnany/partr depends on > > > > > libconcurrent's coroutine implementation which does not work on > > > > > Windows. It appears that Julia is using libuv instead. If we're > > > > > looking for a lighter-weight C coroutine implementation, there is > > > > > http://software.schmorp.de/pkg/libcoro.html, but either way there is > > > > > quite a bit of systems work to create something that can work for > > > > > Arrow. > > > > > > > > > > I don't have an intuition whether depth-first scheduling (what Julia > > > > > is doing) or breadth-first scheduling (aka "work stealing" -- which is > > > > > what Intel's TBB library does [1]) will work better for our use cases. > > > > > But I believe that we need to figure out a programming model (probably > > > > > based on composable futures and continuations given what we are > > > > > already doing) that hides the details of which coroutine/threading > > > > > runtime. > > > > > > > > > > A follow-on project would likely be to define a non-blocking API for > > > > > our various IO interfaces that composes with the rest of the thread > > > > > scheduling machinery. > > > > > > > > > > Either way, this problem is definitely non-trivial so we should figure > > > > > out what "default" approach we can implement that is compatible with > > > > > our "minimal dependency core build" approach in C++ (which may involve > > > > > vendoring some third party code, but not sure if vendoring TBB is a > > > > > good idea) and go and do that. If anyone would like to be funded to > > > > > work on this problem, please get in touch with me offline. > > > > > > > > > > Thanks > > > > > Wes > > > > > > > > > > [1]: > > > > > > > > > https://software.intel.com/content/www/us/en/develop/blogs/the-work-isolation-functionality-in-intel-threading-building-blocks-intel-tbb.html > > > > > > > > > > On Sat, Sep 19, 2020 at 5:21 PM Weston Pace <weston.p...@gmail.com> > > > > wrote: > > > > > > > > > > > > Ok, my skill with C++ got in the way of my ability to put something > > > > > > together. First, I did not realize that C++ futures were a little > > > > > > different than the definition I'm used to for futures. By default, > > > > > > C++ futures are not composable, you can't add continuations with > > > > > > `then`, `when_all` or `when_any`. There is an extension for this > > > > > > (not > > > > > > sure if it will make it even in C++20) and there are continuations > > > > > > for > > > > > > futures in boost's futures. However, since arrow is currently using > > > > > > its own future implementation I could not use either of these > > > > > > libraries. I spent a bit trying to add continuations to arrow's > > > > > > future implementation but my lack of skill with C++ got in the way. > > > > > > I > > > > > > want to keep working on it but it may be a few days. In the > > > > > > meantime > > > > > > I will try and type up something more complete (with a few diagrams) > > > > > > to explain what I'm intending. > > > > > > > > > > > > Having looked at the code for a while I do have a better sense of > > > > > > what > > > > > > is involved. I think it would be a pretty extensive set of changes. > > > > > > Also, it looks like C++20 is planning on adopting co-routines which > > > > > > they will be using for sequential async. So perhaps it makes more > > > > > > sense to go directly to coroutines instead of moving to composable > > > > > > futures and then later to coroutines at some point in the future. > > > > > > > > > > > > Also, re: Julia, I looked into it a bit further and Julia is using > > > > > > libuv under the hood for all file I/O (which is non-blocking I/O). > > > > > > Also async/await are built into the bones of Julia. As far as I can > > > > > > tell from my brief examination is that there is no way to have a > > > > > > Julia > > > > > > task that is performing blocking I/O (in the sense that a "thread > > > > > > pool > > > > > > thread" is blocked on I/O. You can have blocking I/O in the > > > > > > async/await sense where you are awaiting on I/O to maintain > > > > > > sequential > > > > > > semantics. > > > > > > > > > > > > On Wed, Sep 16, 2020 at 8:10 AM Weston Pace <weston.p...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > If you want to specifically look at the problem of dataset > > > > > > > scanning, > > > > > > > file scanning, and nested parallelism then probably the lowest > > > > > > > effort > > > > > > > improvement would be to eliminate the whole idea of "scan > > > > > > > threads". > > > > > > > You currently have... > > > > > > > > > > > > > > for (size_t i = 0; i < readers.size(); ++i) { > > > > > > > ARROW_ASSIGN_OR_RAISE(futures[i], > > > > pool->Submit(ReadColumnFunc, > > > > > i)); > > > > > > > } > > > > > > > Status final_status; > > > > > > > for (auto& fut : futures) { > > > > > > > final_status &= fut.status(); > > > > > > > } > > > > > > > // Hiding some follow-up aggregation and the next line is a > > > > > > > bit > > > > > abbreviated > > > > > > > return Validate(); > > > > > > > > > > > > > > You're already using futures so it would be pretty > > > > > > > straightforward to > > > > > > > change that to > > > > > > > > > > > > > > for (size_t i = 0; i < readers.size(); ++i) { > > > > > > > ARROW_ASSIGN_OR_RAISE(futures[i], > > > > pool->Submit(ReadColumnFunc, > > > > > i)); > > > > > > > } > > > > > > > // Hiding some follow-up aggregation and the next line is a > > > > > > > bit > > > > > abbreviated > > > > > > > return > > > > > > > > > std::experimental::when_all(futures).then(FollowUpAggregation).then(Validate); > > > > > > > > > > > > > > Dataset scans are currently using a threaded task group. Those > > > > > > > would > > > > > > > change to std::experimental::when_all instead. So now the dataset > > > > > > > scan is not creating N threads but again just returning a composed > > > > > > > future. So if you have one dataset scan across 4 files and each > > > > > > > file > > > > > > > kicks off 10 column reader tasks then you have 40 "threads" > > > > > > > submitted > > > > > > > to your thread pool and the main calling thread waiting on the > > > > future. > > > > > > > All of these thread pool threads are inner worker threads. None > > > > > > > of > > > > > > > these thread pool threads have to wait on other threads. There > > > > > > > is no > > > > > > > possibility of deadlock. > > > > > > > > > > > > > > You can do this at each level of nesting so that only your inner > > > > > > > most > > > > > > > worker threads are actually calling `pool->Submit`. There is then > > > > > > > just one outer main thread (presumably not a thread pool thread) > > > > > > > that > > > > > > > is waiting on the future. It's not a super small change because > > > > > > > now > > > > > > > FileReaderImpl::ReadRowGroups returns a future. That would have > > > > > > > to > > > > > > > propagate all the way up so that your dataset scan itself is > > > > returning > > > > > > > a future (you can safely synchronize it at this point so your > > > > > > > public > > > > > > > API remains synchronous because no public API call is going to be > > > > > > > arriving on a thread pool thread). > > > > > > > > > > > > > > That at least solves the deadlock problem. It also starts to > > > > > > > propagate futures throughout the code base which could be good or > > > > > > > bad > > > > > > > depending on your view of such things. It does not solve the > > > > > > > under-utilization problem because you still have threads sitting > > > > > > > in > > > > > > > the thread pool waiting on blocking I/O. > > > > > > > > > > > > > > The next step would be to move to non-blocking I/O. At this point > > > > you > > > > > > > have quite a few choices. > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 7:26 AM Wes McKinney <wesmck...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 10:31 AM Jorge Cardoso Leitão > > > > > > > > <jorgecarlei...@gmail.com> wrote: > > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > I am not sure I fully understand, so I will try to give an > > > > example > > > > > to > > > > > > > > > check: we have a simple query that we want to write the > > > > > > > > > result to > > > > > some > > > > > > > > > place: > > > > > > > > > > > > > > > > > > SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a > > > > > > > > > > > > > > > > > > At the physical plane, we need to > > > > > > > > > > > > > > > > > > 1. read each file in batches > > > > > > > > > 2. join the batches > > > > > > > > > 3. iterate over results and write them in partitions > > > > > > > > > > > > > > > > > > In principle, we can multi-thread them > > > > > > > > > > > > > > > > > > 1. multi-threaded scan > > > > > > > > > 2. multi-threaded hash join (e.g. with a shared map) > > > > > > > > > 3. multi-threaded write (e.g. 1 file per partition) > > > > > > > > > > > > > > > > > > The issue is that when we schedule this, the physical nodes > > > > > themselves > > > > > > > > > control how they perform their own operations, and there is no > > > > > > > > > orchestration as to what resources are available and what > > > > > > > > > should > > > > be > > > > > > > > > prioritized. Consequently, we may have a scan of table t1 > > > > > > > > > that is > > > > > running > > > > > > > > > with 12 threads, while the scan of table t2 is waiting for a > > > > > thread to be > > > > > > > > > available. This causes the computation to stall as both are > > > > > required for > > > > > > > > > step 2 to proceed. OTOH, if we have no multithreaded scans, > > > > > > > > > then > > > > > > > > > multithreading seldom helps, as we are bottlenecked by the > > > > > > > > > scans' > > > > > > > > > throughput. Is this the gist of the problem? > > > > > > > > > > > > > > > > > > If yes: the core issue here seems to be that there is no > > > > > orchestrator to > > > > > > > > > re-prioritize CPU to where it is needed (the scan of t2 in the > > > > > example > > > > > > > > > above), because each physical node has a thread.join that is > > > > > > > > > not > > > > > > > > > coordinated with their downstream dependencies (and so on). > > > > > > > > > Isn't > > > > > this a > > > > > > > > > natural candidate for futures/async? We seem to need some > > > > > coordination > > > > > > > > > across the DAG. > > > > > > > > > > > > > > > > > > If not: could someone offer an example describing how the > > > > > multi-threaded > > > > > > > > > scan can cause a deadlock? > > > > > > > > > > > > > > > > Suppose that we have 4 large CSV files in Amazon S3 and a static > > > > > > > > thread pool with 4 threads. If we use the thread pool to execute > > > > scan > > > > > > > > tasks for all 4 files in parallel, then if any of those scan > > > > > > > > tasks > > > > > > > > internally try to spawn tasks in the same thread pool (before > > > > > > > > other > > > > > > > > tasks have finished) to parallelize some of their computational > > > > work > > > > > > > > -- i.e. "nested parallelism" is what we call this -- then you > > > > > > > > have > > > > a > > > > > > > > deadlock because our current thread pool implementation cannot > > > > > > > > distinguish between task interdependencies / does not understand > > > > > > > > nested parallelism. > > > > > > > > > > > > > > > > > Best, > > > > > > > > > Jorge > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney < > > > > wesmck...@gmail.com> > > > > > wrote: > > > > > > > > > > > > > > > > > > > hi Jacob, > > > > > > > > > > > > > > > > > > > > The approach taken in Julia strikes me as being motivated by > > > > the > > > > > same > > > > > > > > > > problems that we have in this project. It would be > > > > > > > > > > interesting > > > > if > > > > > > > > > > partr could be used as the basis of our nested parallelism > > > > > runtime. > > > > > > > > > > How does Julia handle IO calls within spawned tasks? In > > > > > > > > > > other > > > > > words, > > > > > > > > > > if we have a function like: > > > > > > > > > > > > > > > > > > > > void MyTask() { > > > > > > > > > > DoCPUWork(); > > > > > > > > > > DoSomeIO(); > > > > > > > > > > DoMoreCPUWork(); > > > > > > > > > > DoAdditionalIO(); > > > > > > > > > > } > > > > > > > > > > > > > > > > > > > > (or maybe you just aren't supposed to do that) > > > > > > > > > > > > > > > > > > > > The biggest question would be the C++ programming model (in > > > > other > > > > > > > > > > words, how we have to change our approach to writing code) > > > > > > > > > > that > > > > > we use > > > > > > > > > > throughout the Arrow libraries. What I'm getting at is to > > > > figure > > > > > out > > > > > > > > > > how to minimize the amount of code that needs to be > > > > significantly > > > > > > > > > > altered to fit in with the new approach to work scheduling. > > > > > > > > > > For > > > > > > > > > > example, it doesn't strike me that the API that we are > > > > > > > > > > using to > > > > > > > > > > parallelize reading Parquet files at the column level is > > > > > > > > > > going > > > > > to work > > > > > > > > > > because there are various IO calls within the tasks that are > > > > > being > > > > > > > > > > submitted to the thread pool > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875 > > > > > > > > > > > > > > > > > > > > - Wes > > > > > > > > > > > > > > > > > > > > On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn < > > > > > quinn.jac...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > My immediate thought reading the discussion points was > > > > Julia's > > > > > task-based > > > > > > > > > > > multithreading model that has been part of the language > > > > > > > > > > > for > > > > > over a year > > > > > > > > > > > now. An announcement blogpost for Julia 1.3 laid out some > > > > > > > > > > > of > > > > > the details > > > > > > > > > > > and high-level approach: > > > > > > > > > > https://julialang.org/blog/2019/07/multithreading/, > > > > > > > > > > > and the multithreading code was marked stable in the > > > > > > > > > > > recent > > > > > 1.5 release. > > > > > > > > > > > > > > > > > > > > > > Kiran, one of the main contributors to the threading > > > > > > > > > > > model in > > > > > Julia, > > > > > > > > > > worked > > > > > > > > > > > on a separate C-based repo for the core functionality ( > > > > > > > > > > > https://github.com/kpamnany/partr), but I think the latest > > > > > code is > > > > > > > > > > embedded > > > > > > > > > > > in the Julia source code now. > > > > > > > > > > > > > > > > > > > > > > Anyway, probably most useful as a reference, but Jameson > > > > > (cc'd) also does > > > > > > > > > > > weekly multithreading chats (on Wednesdays), so I imagine > > > > > > > > > > > he > > > > > wouldn't > > > > > > > > > > mind > > > > > > > > > > > chatting about things if desired. > > > > > > > > > > > > > > > > > > > > > > -Jacob > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace < > > > > > weston.p...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > My C++ is pretty rusty but I'll see if I can come up > > > > > > > > > > > > with a > > > > > concrete > > > > > > > > > > > > CSV example / experiment / proof of concept on Friday > > > > > > > > > > > > when > > > > I > > > > > have a > > > > > > > > > > > > break from work. > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney < > > > > > wesmck...@gmail.com> > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace < > > > > > weston.p...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > Yes. Thank you. I am in agreement with you and > > > > > futures/callbacks > > > > > > > > > > are > > > > > > > > > > > > > > one such "richer programming model for > > > > > > > > > > > > > > hierarchical work scheduling". > > > > > > > > > > > > > > > > > > > > > > > > > > > > A scan task with a naive approach is: > > > > > > > > > > > > > > > > > > > > > > > > > > > > workers = partition_files_list(files_list) > > > > > > > > > > > > > > for worker in workers: > > > > > > > > > > > > > > start_thread(worker) > > > > > > > > > > > > > > for worker in workers: > > > > > > > > > > > > > > join_thread(worker) > > > > > > > > > > > > > > return aggregate_results() > > > > > > > > > > > > > > > > > > > > > > > > > > > > You have N+1 threads because you have N worker > > > > > > > > > > > > > > threads > > > > > and 1 scan > > > > > > > > > > > > > > thread. There is the potential for deadlock if your > > > > > thread pool > > > > > > > > > > only > > > > > > > > > > > > > > has one remaining spot and it is given to the scan > > > > > thread. > > > > > > > > > > > > > > > > > > > > > > > > > > > > On the other hand, with a futures based approach you > > > > > have: > > > > > > > > > > > > > > > > > > > > > > > > > > > > futures = partition_files_list(files_list) > > > > > > > > > > > > > > return when_all(futures).do(aggregate_results) > > > > > > > > > > > > > > > > > > > > > > > > > > > > There are only N threads. The scan thread goes > > > > > > > > > > > > > > away. > > > > > In fact, if > > > > > > > > > > all > > > > > > > > > > > > > > of your underlying OS/FS libraries are non-blocking > > > > then > > > > > you can > > > > > > > > > > > > > > completely eliminate threads in the waiting state > > > > > > > > > > > > > > and > > > > an > > > > > entire > > > > > > > > > > > > > > category of deadlocks are no longer a possibility. > > > > > > > > > > > > > > > > > > > > > > > > > > I don't quite follow. I think it would be most > > > > > > > > > > > > > helpful to > > > > > focus on a > > > > > > > > > > > > > concrete practical matter like reading Parquet or CSV > > > > > files in > > > > > > > > > > > > > parallel (which can be go faster through parallelism > > > > > > > > > > > > > at > > > > > the single > > > > > > > > > > > > > file level) and devise a programming model in C++ > > > > > > > > > > > > > that is > > > > > different > > > > > > > > > > > > > from what we are currently doing that results in > > > > > > > > > > > > > superior > > > > > CPU > > > > > > > > > > > > > utilization. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Weston > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney < > > > > > wesmck...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > hi Weston, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We've discussed some of these problems in the > > > > > > > > > > > > > > > past -- > > > > > I was > > > > > > > > > > > > > > > enumerating some of these issues to highlight the > > > > > problems that > > > > > > > > > > are > > > > > > > > > > > > > > > resulting from an absence of a richer programming > > > > > model for > > > > > > > > > > > > > > > hierarchical work scheduling. Parallel tasks > > > > > originating in each > > > > > > > > > > > > > > > workload are submitted to a global thread pool > > > > > > > > > > > > > > > where > > > > > they are > > > > > > > > > > > > > > > commingled with the tasks coming from other > > > > workloads. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > As an example of how this can go wrong, suppose we > > > > > have a static > > > > > > > > > > > > > > > thread pool with 4 executors. If we submit 4 > > > > > long-running tasks > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > > pool, and then each of these tasks spawn > > > > > > > > > > > > > > > additional > > > > > tasks that go > > > > > > > > > > > > into > > > > > > > > > > > > > > > the thread pool, a deadlock can occur, because the > > > > > thread pool > > > > > > > > > > thinks > > > > > > > > > > > > > > > that it's executing tasks when in fact those tasks > > > > are > > > > > waiting on > > > > > > > > > > > > > > > their dependent tasks to complete. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > A similar resource underutilization occurs when > > > > > > > > > > > > > > > we do > > > > > > > > > > > > > > > pool->Submit(ReadFile), where ReadFile needs to do > > > > > some IO -- > > > > > > > > > > from > > > > > > > > > > > > the > > > > > > > > > > > > > > > thread pool's perspective, the task is "working" > > > > > > > > > > > > > > > even > > > > > though it > > > > > > > > > > may > > > > > > > > > > > > > > > wait for one or more IO calls to complete. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In the Datasets API in C++ we have both of these > > > > > problems: file > > > > > > > > > > scan > > > > > > > > > > > > > > > tasks are being pushed onto the global thread > > > > > > > > > > > > > > > pool, > > > > > and so to > > > > > > > > > > prevent > > > > > > > > > > > > > > > deadlocks multithreaded file parsing has been > > > > disabled. > > > > > > > > > > Additionally, > > > > > > > > > > > > > > > the scan tasks do IO, resulting in suboptimal > > > > > performance (the > > > > > > > > > > > > > > > problems caused by this will be especially > > > > exacerbated > > > > > when > > > > > > > > > > running > > > > > > > > > > > > > > > against slower filesystems like Amazon S3) > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hopefully the issues are more clear. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks > > > > > > > > > > > > > > > Wes > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace < > > > > > > > > > > weston.p...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > It sounds like you are describing two problems. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1) Idleness - Tasks are holding threads in the > > > > > thread pool > > > > > > > > > > while > > > > > > > > > > > > they > > > > > > > > > > > > > > > > wait for IO or some long running non-CPU task to > > > > > complete. > > > > > > > > > > These > > > > > > > > > > > > > > > > threads are often in a "wait" state or something > > > > > similar. > > > > > > > > > > > > > > > > 2) Fairness - The ordering of tasks is causing > > > > short > > > > > tasks that > > > > > > > > > > > > could > > > > > > > > > > > > > > > > be completed quickly from being stuck behind > > > > > > > > > > > > > > > > longer > > > > > term tasks. > > > > > > > > > > > > > > > > Fairness can be an issue even if all tasks are > > > > > always in the > > > > > > > > > > active > > > > > > > > > > > > > > > > state consuming CPU time. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Are both of these issues a problem? Are you > > > > looking > > > > > to address > > > > > > > > > > > > both of them? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I doubt it's much help as it is probably a more > > > > > substantial > > > > > > > > > > change > > > > > > > > > > > > > > > > than what you were looking for but the popular > > > > > solution to #1 > > > > > > > > > > these > > > > > > > > > > > > > > > > days seems to be moving toward non blocking IO > > > > > > > > > > > > > > > > with > > > > > > > > > > > > > > > > promises/callbacks/async. That way threads are > > > > > never in the > > > > > > > > > > > > waiting > > > > > > > > > > > > > > > > state (unless sitting idle in the pool). > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -Weston > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney < > > > > > > > > > > wesmck...@gmail.com> > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle > > > > > > > > > > > > > > > > > the > > > > > discussion > > > > > > > > > > > > about our > > > > > > > > > > > > > > > > > approach to multithreading (especially the > > > > > _programming > > > > > > > > > > model_) > > > > > > > > > > > > in > > > > > > > > > > > > > > > > > C++. We had some discussions about this about > > > > > > > > > > > > > > > > > 6 > > > > > months ago > > > > > > > > > > and > > > > > > > > > > > > there > > > > > > > > > > > > > > > > > were more discussions as I recall in summer > > > > > > > > > > > > > > > > > 2019. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Realistically, we are going to be consistently > > > > > dealing with > > > > > > > > > > > > > > > > > independent concurrent in-process workloads > > > > > > > > > > > > > > > > > that > > > > > each > > > > > > > > > > > > respectively can > > > > > > > > > > > > > > > > > go faster by multithreading. These could be > > > > things > > > > > like: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) > > > > > > > > > > > > > > > > > that > > > > > benefit from > > > > > > > > > > > > > > > > > multithreaded parsing/decoding > > > > > > > > > > > > > > > > > * Reading one or more files in parallel using > > > > > > > > > > > > > > > > > the > > > > > Datasets > > > > > > > > > > API > > > > > > > > > > > > > > > > > * Executing any number of multithreaded > > > > analytical > > > > > workloads > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > One obvious issue with our thread scheduling > > > > > > > > > > > > > > > > > is > > > > > the FIFO > > > > > > > > > > nature > > > > > > > > > > > > of the > > > > > > > > > > > > > > > > > global thread pool. If a new independent > > > > > multithreaded > > > > > > > > > > workload > > > > > > > > > > > > shows > > > > > > > > > > > > > > > > > up, it has to wait for other workloads to > > > > complete > > > > > before the > > > > > > > > > > > > new work > > > > > > > > > > > > > > > > > will be scheduled. Think about a Flight server > > > > > serving > > > > > > > > > > queries to > > > > > > > > > > > > > > > > > users -- is it fair for one query to "hog" the > > > > > thread pool > > > > > > > > > > and > > > > > > > > > > > > force > > > > > > > > > > > > > > > > > other requests to wait until they can get > > > > > > > > > > > > > > > > > access > > > > > to some CPU > > > > > > > > > > > > > > > > > resources? You could imagine a workload that > > > > > spawns 10 > > > > > > > > > > minutes > > > > > > > > > > > > worth > > > > > > > > > > > > > > > > > of CPU work, where a new workload has to wait > > > > > > > > > > > > > > > > > for > > > > > all of that > > > > > > > > > > > > work to > > > > > > > > > > > > > > > > > complete before having any tasks scheduled for > > > > > execution. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > The approach that's been taken in the Datasets > > > > API > > > > > to avoid > > > > > > > > > > > > problems > > > > > > > > > > > > > > > > > with nested parallelism (file-specific > > > > > > > > > > > > > > > > > operations > > > > > spawning > > > > > > > > > > > > multiple > > > > > > > > > > > > > > > > > tasks onto the global thread pool) is simply > > > > > > > > > > > > > > > > > to > > > > > disable > > > > > > > > > > > > multithreading > > > > > > > > > > > > > > > > > at the level of a single file. This is clearly > > > > > suboptimal. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We have additional problems in that some > > > > > file-loading related > > > > > > > > > > > > tasks do > > > > > > > > > > > > > > > > > a mixture of CPU work and IO work, and once a > > > > > thread has been > > > > > > > > > > > > > > > > > dispatched to execute one of these tasks, > > > > > > > > > > > > > > > > > when IO > > > > > takes > > > > > > > > > > place, a > > > > > > > > > > > > CPU > > > > > > > > > > > > > > > > > core may sit underutilized while the IO is > > > > waiting. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > There's more aspects we can discuss, but in > > > > > general I think > > > > > > > > > > we > > > > > > > > > > > > need to > > > > > > > > > > > > > > > > > come up with a programming model for building > > > > > > > > > > > > > > > > > our > > > > > C++ system > > > > > > > > > > > > > > > > > components with the following requirements: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > * Deadlocks not possible by design > > > > > > > > > > > > > > > > > * Any component can safely use "nested > > > > > parallelism" without > > > > > > > > > > the > > > > > > > > > > > > > > > > > programmer having to worry about deadlocks or > > > > > > > > > > > > > > > > > one > > > > > task > > > > > > > > > > "hogging" > > > > > > > > > > > > the > > > > > > > > > > > > > > > > > thread pool. So in other words, if there's > > > > > > > > > > > > > > > > > only a > > > > > single > > > > > > > > > > > > > > > > > multithreading-capable workload running, we > > > > > > > > > > > > > > > > > "let > > > > > it rip" > > > > > > > > > > > > > > > > > * Resources can be reasonably fairly allocated > > > > > amongst > > > > > > > > > > concurrent > > > > > > > > > > > > > > > > > workloads (think: independent requests coming > > > > > > > > > > > > > > > > > in > > > > > through > > > > > > > > > > Flight, > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > scan tasks on different Parquet files in the > > > > > Datasets API). > > > > > > > > > > Limit > > > > > > > > > > > > > > > > > scenarios where a new workload is blocked > > > > > altogether on the > > > > > > > > > > > > completion > > > > > > > > > > > > > > > > > of other workloads > > > > > > > > > > > > > > > > > * A well-defined programming pattern for tasks > > > > > that do a > > > > > > > > > > mixture > > > > > > > > > > > > of > > > > > > > > > > > > > > > > > CPU work and IO work that allows CPU cores to > > > > > > > > > > > > > > > > > be > > > > > used when a > > > > > > > > > > > > task is > > > > > > > > > > > > > > > > > waiting on IO > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > We can't be the only project that has these > > > > > problems, so I'm > > > > > > > > > > > > > > > > > interested to see what solutions have been > > > > > successfully > > > > > > > > > > employed > > > > > > > > > > > > by > > > > > > > > > > > > > > > > > others. For example, it strikes me as similar > > > > > > > > > > > > > > > > > to > > > > > concurrency > > > > > > > > > > > > issues > > > > > > > > > > > > > > > > > inside an analytic database. How are they > > > > > preventing > > > > > > > > > > concurrent > > > > > > > > > > > > > > > > > workload starvation problems or handling > > > > > > > > > > > > > > > > > CPU/IO > > > > > task > > > > > > > > > > scheduling > > > > > > > > > > > > to > > > > > > > > > > > > > > > > > avoid CPU underutilization? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Choices of which threading libraries we might > > > > > > > > > > > > > > > > > use > > > > > to > > > > > > > > > > implement a > > > > > > > > > > > > > > > > > viable solution (e.g. TBB) seem secondary to > > > > > > > > > > > > > > > > > the > > > > > programming > > > > > > > > > > > > model > > > > > > > > > > > > > > > > > that we use to implement our components. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > Wes > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >