Not C++, but I found the discussion about the Rust's tokio project's
scheduler to be interesting / relevant

https://tokio.rs/blog/2019-10-scheduler

On Tue, Sep 22, 2020 at 4:54 PM Wes McKinney <wesmck...@gmail.com> wrote:
>
> Thanks for the pointer to CAF. It reminds me a bit of libprocess which
> is a part of Apache Mesos, which also provides the actor model
>
> https://github.com/apache/mesos/tree/master/3rdparty/libprocess
>
> We'll have to determine a solution that is compatible with our
> spectrum of compiler toolchain support (e.g. we are still technically
> supporting gcc 4.8, but may drop it in the future). Things will get
> easier once we can move to C++14, but C++17 is probably not in the
> cards for quite some time.
>
> On Tue, Sep 22, 2020 at 2:18 PM Matthias Vallentin
> <matth...@vallentin.net> wrote:
> >
> > We are building a highly concurrent database for security data with Arrow
> > as data plane (VAST <https://github.com/tenzir/vast>), so I thought I'll
> > share our view on this since we went over pretty much all of the above
> > mentioned questions. I'm not trying to say "you should do it this way" but
> > instead share our journey, in the hope that you can draw some insight from
> > it.
> >
> > It sounds like there are several challenges to be solved, ranging from
> > non-blocking I/O to efficient task-based scheduling - for some notion of
> > task. We found that the actor model
> > <https://docs.tenzir.com/vast/architecture/actor-model/> solved all of
> > these challenges. In particular, we rely on CAF
> > <https://github.com/actor-framework/actor-framework>, the C++ Actor
> > Framework as concrete implementation. The basic abstraction is that an
> > actor, which is effectively a heavy-weight task (100-200 bytes) that can be
> > scheduled in two ways: on a dedicated thread or in a thread pool. The
> > thread pool is driven either by a work-stealing or work-sharing scheduler,
> > based on the deployment environment. Here's how we solve some concrete
> > problems with these abstractions:
> >
> >    1. *Asynchronous I/O*: we have one "detached" filesystem actor that
> >    lives in its own thread that does all I/O. All reads and writes (mmap 
> > too,
> >    but let's take that aside) go through this actor. You'd request a write
> >    with a contiguous chunk of data, and get a response back when the 
> > operation
> >    succeeded. Any actor can interact with the filesystem actor, also actors
> >    that are scheduled in the pool. The point of this abstraction is that I/O
> >    operations can block, which is why you never want to execute them in the
> >    thread pool. Otherwise all other tasks/actors that are scheduled right
> >    behind the I/O operation might get stalled. Sure, work-stealing will
> >    alleviate this, but not when steals occur frequently.
> >    2. *Concurrent task execution*: if the work can be *overdecomposed* into
> >    smaller chunks such that there are more chunks than CPU cores, a thread
> >    pool plus scheduler will do a good job at exploiting the available system
> >    concurrency. In our use case, we build dozens of indexes in parallel, 
> > with
> >    one actor being responsible for one column. They all run in parallel and
> >    independent of each other, but operate on the same (immutable) data. So
> >    there are no data races by design. Once an indexer actor completes a set 
> > of
> >    record batches, it builds a flatbuffer and ships it to the I/O actor to
> >    persist it. At that point we're in point (1), asynchronous I/O. This 
> > plays
> >    together nicely.
> >    3. *Network transparency*: Since the actor model communication
> >    abstraction is message passing, it's easy to hide the actor location, be 
> > it
> >    in memory or remotely available via IPC. The actor runtime takes care of
> >    either just passing a pointer with the message contents or doing the
> >    transparent serialization. This makes it very easy to build a distributed
> >    system if need be, or run everything in a single process.
> >
> > We could have gone with lower-level abstraction, e.g., thread pool
> > with coroutines, but decided that we get more mileage from an actor
> > runtime. We see coroutines just as the syntactic sugar that make the
> > inversion of control more reasonable to understand through straight-line
> > code, but it's not a new messaging capability in the context of the actor
> > model implementation we use, which allows for arbitrary messaging and
> > synchronization patterns - all fully asynchronous, i.e., non-blocking by
> > yielding to the scheduler. Calling .then(...) on when a message arrives is
> > effectively a future, and we frequently create response promises and
> > message delegation patterns, often implicitly through the runtime simply by
> > returning a value in a lambda. We also challenged the overhead of an actor
> > compared to a light-weight task, but found that even creating millions of
> > actors in parallel in CAF still doesn't cause memory pressure or
> > substantially more cache misses. At the end, we could not find a reason to
> > *not* go with CAF, and we don't regret this choice to date. To date, we
> > work with the experimental credit-based streaming feature of CAF that gives
> > Flink-like streaming semantics though actor-based backpressure. Once again,
> > a single powerful abstraction to pretty much address all our needs in
> > scalable distributed systems that is close to the hardware as well.
> >
> > I hope this helps making better decisions in finding the right abstractions
> > for you. I've used the actor model as a vehicle for my arguments, but there
> > are other isomorphic models and vocabulary (e.g., CSP). The main point I
> > want to get across is that thinking too low-level and single-solution (only
> > threapools, just coroutines, etc.) may result in a local instead of global
> > optimum.
> >
> >     Matthias
> >
> >
> > On Mon, Sep 21, 2020 at 9:38 PM Ben Kietzman <b...@ursacomputing.com> wrote:
> >
> > > FWIW boost.coroutine and boost.asio provide composable coroutines,
> > > non blocking IO, and configurable scheduling for CPU work out of the box.
> > >
> > > The boost libraries are not lightweight but they are robust and
> > > cross platform, so I think asio is worth consideration.
> > >
> > > On Sat, Sep 19, 2020 at 8:22 PM Wes McKinney <wesmck...@gmail.com> wrote:
> > >
> > > > I took a look at https://github.com/kpamnany/partr and Julia's
> > > > production iteration of that -- kpamnany/partr depends on
> > > > libconcurrent's coroutine implementation which does not work on
> > > > Windows. It appears that Julia is using libuv instead. If we're
> > > > looking for a lighter-weight C coroutine implementation, there is
> > > > http://software.schmorp.de/pkg/libcoro.html, but either way there is
> > > > quite a bit of systems work to create something that can work for
> > > > Arrow.
> > > >
> > > > I don't have an intuition whether depth-first scheduling (what Julia
> > > > is doing) or breadth-first scheduling (aka "work stealing" -- which is
> > > > what Intel's TBB library does [1]) will work better for our use cases.
> > > > But I believe that we need to figure out a programming model (probably
> > > > based on composable futures and continuations given what we are
> > > > already doing) that hides the details of which coroutine/threading
> > > > runtime.
> > > >
> > > > A follow-on project would likely be to define a non-blocking API for
> > > > our various IO interfaces that composes with the rest of the thread
> > > > scheduling machinery.
> > > >
> > > > Either way, this problem is definitely non-trivial so we should figure
> > > > out what "default" approach we can implement that is compatible with
> > > > our "minimal dependency core build" approach in C++ (which may involve
> > > > vendoring some third party code, but not sure if vendoring TBB is a
> > > > good idea) and go and do that. If anyone would like to be funded to
> > > > work on this problem, please get in touch with me offline.
> > > >
> > > > Thanks
> > > > Wes
> > > >
> > > > [1]:
> > > >
> > > https://software.intel.com/content/www/us/en/develop/blogs/the-work-isolation-functionality-in-intel-threading-building-blocks-intel-tbb.html
> > > >
> > > > On Sat, Sep 19, 2020 at 5:21 PM Weston Pace <weston.p...@gmail.com>
> > > wrote:
> > > > >
> > > > > Ok, my skill with C++ got in the way of my ability to put something
> > > > > together.  First, I did not realize that C++ futures were a little
> > > > > different than the definition I'm used to for futures.  By default,
> > > > > C++ futures are not composable, you can't add continuations with
> > > > > `then`, `when_all` or `when_any`.  There is an extension for this (not
> > > > > sure if it will make it even in C++20) and there are continuations for
> > > > > futures in boost's futures.  However, since arrow is currently using
> > > > > its own future implementation I could not use either of these
> > > > > libraries.  I spent a bit trying to add continuations to arrow's
> > > > > future implementation but my lack of skill with C++ got in the way.  I
> > > > > want to keep working on it but it may be a few days.  In the meantime
> > > > > I will try and type up something more complete (with a few diagrams)
> > > > > to explain what I'm intending.
> > > > >
> > > > > Having looked at the code for a while I do have a better sense of what
> > > > > is involved.  I think it would be a pretty extensive set of changes.
> > > > > Also, it looks like C++20 is planning on adopting co-routines which
> > > > > they will be using for sequential async.  So perhaps it makes more
> > > > > sense to go directly to coroutines instead of moving to composable
> > > > > futures and then later to coroutines at some point in the future.
> > > > >
> > > > > Also, re: Julia, I looked into it a bit further and Julia is using
> > > > > libuv under the hood for all file I/O (which is non-blocking I/O).
> > > > > Also async/await are built into the bones of Julia.  As far as I can
> > > > > tell from my brief examination is that there is no way to have a Julia
> > > > > task that is performing blocking I/O (in the sense that a "thread pool
> > > > > thread" is blocked on I/O.  You can have blocking I/O in the
> > > > > async/await sense where you are awaiting on I/O to maintain sequential
> > > > > semantics.
> > > > >
> > > > > On Wed, Sep 16, 2020 at 8:10 AM Weston Pace <weston.p...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > > If you want to specifically look at the problem of dataset scanning,
> > > > > > file scanning, and nested parallelism then probably the lowest 
> > > > > > effort
> > > > > > improvement would be to eliminate the whole idea of "scan threads".
> > > > > > You currently have...
> > > > > >
> > > > > >     for (size_t i = 0; i < readers.size(); ++i) {
> > > > > >         ARROW_ASSIGN_OR_RAISE(futures[i],
> > > pool->Submit(ReadColumnFunc,
> > > > i));
> > > > > >     }
> > > > > >     Status final_status;
> > > > > >     for (auto& fut : futures) {
> > > > > >         final_status &= fut.status();
> > > > > >     }
> > > > > >     // Hiding some follow-up aggregation and the next line is a bit
> > > > abbreviated
> > > > > >     return Validate();
> > > > > >
> > > > > > You're already using futures so it would be pretty straightforward 
> > > > > > to
> > > > > > change that to
> > > > > >
> > > > > >     for (size_t i = 0; i < readers.size(); ++i) {
> > > > > >         ARROW_ASSIGN_OR_RAISE(futures[i],
> > > pool->Submit(ReadColumnFunc,
> > > > i));
> > > > > >     }
> > > > > >     // Hiding some follow-up aggregation and the next line is a bit
> > > > abbreviated
> > > > > >     return
> > > >
> > > std::experimental::when_all(futures).then(FollowUpAggregation).then(Validate);
> > > > > >
> > > > > > Dataset scans are currently using a threaded task group.  Those 
> > > > > > would
> > > > > > change to std::experimental::when_all instead.  So now the dataset
> > > > > > scan is not creating N threads but again just returning a composed
> > > > > > future.  So if you have one dataset scan across 4 files and each 
> > > > > > file
> > > > > > kicks off 10 column reader tasks then you have 40 "threads" 
> > > > > > submitted
> > > > > > to your thread pool and the main calling thread waiting on the
> > > future.
> > > > > > All of these thread pool threads are inner worker threads.  None of
> > > > > > these thread pool threads have to wait on other threads.  There is 
> > > > > > no
> > > > > > possibility of deadlock.
> > > > > >
> > > > > > You can do this at each level of nesting so that only your inner 
> > > > > > most
> > > > > > worker threads are actually calling `pool->Submit`.  There is then
> > > > > > just one outer main thread (presumably not a thread pool thread) 
> > > > > > that
> > > > > > is waiting on the future.  It's not a super small change because now
> > > > > > FileReaderImpl::ReadRowGroups returns a future.  That would have to
> > > > > > propagate all the way up so that your dataset scan itself is
> > > returning
> > > > > > a future (you can safely synchronize it at this point so your public
> > > > > > API remains synchronous because no public API call is going to be
> > > > > > arriving on a thread pool thread).
> > > > > >
> > > > > > That at least solves the deadlock problem.  It also starts to
> > > > > > propagate futures throughout the code base which could be good or 
> > > > > > bad
> > > > > > depending on your view of such things.  It does not solve the
> > > > > > under-utilization problem because you still have threads sitting in
> > > > > > the thread pool waiting on blocking I/O.
> > > > > >
> > > > > > The next step would be to move to non-blocking I/O.  At this point
> > > you
> > > > > > have quite a few choices.
> > > > > >
> > > > > > On Wed, Sep 16, 2020 at 7:26 AM Wes McKinney <wesmck...@gmail.com>
> > > > wrote:
> > > > > > >
> > > > > > > On Wed, Sep 16, 2020 at 10:31 AM Jorge Cardoso Leitão
> > > > > > > <jorgecarlei...@gmail.com> wrote:
> > > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I am not sure I fully understand, so I will try to give an
> > > example
> > > > to
> > > > > > > > check: we have a simple query that we want to write the result 
> > > > > > > > to
> > > > some
> > > > > > > > place:
> > > > > > > >
> > > > > > > > SELECT t1.b * t2.b FROM t1 JOIN ON t2 WHERE t1.a = t2.a
> > > > > > > >
> > > > > > > > At the physical plane, we need to
> > > > > > > >
> > > > > > > > 1. read each file in batches
> > > > > > > > 2. join the batches
> > > > > > > > 3. iterate over results and write them in partitions
> > > > > > > >
> > > > > > > > In principle, we can multi-thread them
> > > > > > > >
> > > > > > > > 1. multi-threaded scan
> > > > > > > > 2. multi-threaded hash join (e.g. with a shared map)
> > > > > > > > 3. multi-threaded write (e.g. 1 file per partition)
> > > > > > > >
> > > > > > > > The issue is that when we schedule this, the physical nodes
> > > > themselves
> > > > > > > > control how they perform their own operations, and there is no
> > > > > > > > orchestration as to what resources are available and what should
> > > be
> > > > > > > > prioritized. Consequently, we may have a scan of table t1 that 
> > > > > > > > is
> > > > running
> > > > > > > > with 12 threads, while the scan of table t2 is waiting for a
> > > > thread to be
> > > > > > > > available. This causes the computation to stall as both are
> > > > required for
> > > > > > > > step 2 to proceed. OTOH, if we have no multithreaded scans, then
> > > > > > > > multithreading seldom helps, as we are bottlenecked by the 
> > > > > > > > scans'
> > > > > > > > throughput. Is this the gist of the problem?
> > > > > > > >
> > > > > > > > If yes: the core issue here seems to be that there is no
> > > > orchestrator to
> > > > > > > > re-prioritize CPU to where it is needed (the scan of t2 in the
> > > > example
> > > > > > > > above), because each physical node has a thread.join that is not
> > > > > > > > coordinated with their downstream dependencies (and so on). 
> > > > > > > > Isn't
> > > > this a
> > > > > > > > natural candidate for futures/async? We seem to need some
> > > > coordination
> > > > > > > > across the DAG.
> > > > > > > >
> > > > > > > > If not: could someone offer an example describing how the
> > > > multi-threaded
> > > > > > > > scan can cause a deadlock?
> > > > > > >
> > > > > > > Suppose that we have 4 large CSV files in Amazon S3 and a static
> > > > > > > thread pool with 4 threads. If we use the thread pool to execute
> > > scan
> > > > > > > tasks for all 4 files in parallel, then if any of those scan tasks
> > > > > > > internally try to spawn tasks in the same thread pool (before 
> > > > > > > other
> > > > > > > tasks have finished) to parallelize some of their computational
> > > work
> > > > > > > -- i.e. "nested parallelism" is what we call this -- then you have
> > > a
> > > > > > > deadlock because our current thread pool implementation cannot
> > > > > > > distinguish between task interdependencies / does not understand
> > > > > > > nested parallelism.
> > > > > > >
> > > > > > > > Best,
> > > > > > > > Jorge
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Sep 16, 2020 at 4:16 PM Wes McKinney <
> > > wesmck...@gmail.com>
> > > > wrote:
> > > > > > > >
> > > > > > > > > hi Jacob,
> > > > > > > > >
> > > > > > > > > The approach taken in Julia strikes me as being motivated by
> > > the
> > > > same
> > > > > > > > > problems that we have in this project. It would be interesting
> > > if
> > > > > > > > > partr could be used as the basis of our nested parallelism
> > > > runtime.
> > > > > > > > > How does Julia handle IO calls within spawned tasks? In other
> > > > words,
> > > > > > > > > if we have a function like:
> > > > > > > > >
> > > > > > > > > void MyTask() {
> > > > > > > > >   DoCPUWork();
> > > > > > > > >   DoSomeIO();
> > > > > > > > >   DoMoreCPUWork();
> > > > > > > > >   DoAdditionalIO();
> > > > > > > > > }
> > > > > > > > >
> > > > > > > > > (or maybe you just aren't supposed to do that)
> > > > > > > > >
> > > > > > > > > The biggest question would be the C++ programming model (in
> > > other
> > > > > > > > > words, how we have to change our approach to writing code) 
> > > > > > > > > that
> > > > we use
> > > > > > > > > throughout the Arrow libraries. What I'm getting at is to
> > > figure
> > > > out
> > > > > > > > > how to minimize the amount of code that needs to be
> > > significantly
> > > > > > > > > altered to fit in with the new approach to work scheduling. 
> > > > > > > > > For
> > > > > > > > > example, it doesn't strike me that the API that we are using 
> > > > > > > > > to
> > > > > > > > > parallelize reading Parquet files at the column level is going
> > > > to work
> > > > > > > > > because there are various IO calls within the tasks that are
> > > > being
> > > > > > > > > submitted to the thread pool
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > >
> > > https://github.com/apache/arrow/blob/apache-arrow-1.0.1/cpp/src/parquet/arrow/reader.cc#L859-L875
> > > > > > > > >
> > > > > > > > > - Wes
> > > > > > > > >
> > > > > > > > > On Wed, Sep 16, 2020 at 1:37 AM Jacob Quinn <
> > > > quinn.jac...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > My immediate thought reading the discussion points was
> > > Julia's
> > > > task-based
> > > > > > > > > > multithreading model that has been part of the language for
> > > > over a year
> > > > > > > > > > now. An announcement blogpost for Julia 1.3 laid out some of
> > > > the details
> > > > > > > > > > and high-level approach:
> > > > > > > > > https://julialang.org/blog/2019/07/multithreading/,
> > > > > > > > > > and the multithreading code was marked stable in the recent
> > > > 1.5 release.
> > > > > > > > > >
> > > > > > > > > > Kiran, one of the main contributors to the threading model 
> > > > > > > > > > in
> > > > Julia,
> > > > > > > > > worked
> > > > > > > > > > on a separate C-based repo for the core functionality (
> > > > > > > > > > https://github.com/kpamnany/partr), but I think the latest
> > > > code is
> > > > > > > > > embedded
> > > > > > > > > > in the Julia source code now.
> > > > > > > > > >
> > > > > > > > > > Anyway, probably most useful as a reference, but Jameson
> > > > (cc'd) also does
> > > > > > > > > > weekly multithreading chats (on Wednesdays), so I imagine he
> > > > wouldn't
> > > > > > > > > mind
> > > > > > > > > > chatting about things if desired.
> > > > > > > > > >
> > > > > > > > > > -Jacob
> > > > > > > > > >
> > > > > > > > > > On Tue, Sep 15, 2020 at 8:17 PM Weston Pace <
> > > > weston.p...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > My C++ is pretty rusty but I'll see if I can come up with 
> > > > > > > > > > > a
> > > > concrete
> > > > > > > > > > > CSV example / experiment / proof of concept on Friday when
> > > I
> > > > have a
> > > > > > > > > > > break from work.
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Sep 15, 2020 at 3:47 PM Wes McKinney <
> > > > wesmck...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:54 PM Weston Pace <
> > > > weston.p...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yes.  Thank you.  I am in agreement with you and
> > > > futures/callbacks
> > > > > > > > > are
> > > > > > > > > > > > > one such "richer programming model for
> > > > > > > > > > > > > hierarchical work scheduling".
> > > > > > > > > > > > >
> > > > > > > > > > > > > A scan task with a naive approach is:
> > > > > > > > > > > > >
> > > > > > > > > > > > >     workers = partition_files_list(files_list)
> > > > > > > > > > > > >     for worker in workers:
> > > > > > > > > > > > >         start_thread(worker)
> > > > > > > > > > > > >     for worker in workers:
> > > > > > > > > > > > >         join_thread(worker)
> > > > > > > > > > > > >     return aggregate_results()
> > > > > > > > > > > > >
> > > > > > > > > > > > > You have N+1 threads because you have N worker threads
> > > > and 1 scan
> > > > > > > > > > > > > thread.  There is the potential for deadlock if your
> > > > thread pool
> > > > > > > > > only
> > > > > > > > > > > > > has one remaining spot and it is given to the scan
> > > > thread.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On the other hand, with a futures based approach you
> > > > have:
> > > > > > > > > > > > >
> > > > > > > > > > > > > futures = partition_files_list(files_list)
> > > > > > > > > > > > > return when_all(futures).do(aggregate_results)
> > > > > > > > > > > > >
> > > > > > > > > > > > > There are only N threads.  The scan thread goes away.
> > > > In fact, if
> > > > > > > > > all
> > > > > > > > > > > > > of your underlying OS/FS libraries are non-blocking
> > > then
> > > > you can
> > > > > > > > > > > > > completely eliminate threads in the waiting state and
> > > an
> > > > entire
> > > > > > > > > > > > > category of deadlocks are no longer a possibility.
> > > > > > > > > > > >
> > > > > > > > > > > > I don't quite follow. I think it would be most helpful 
> > > > > > > > > > > > to
> > > > focus on a
> > > > > > > > > > > > concrete practical matter like reading Parquet or CSV
> > > > files in
> > > > > > > > > > > > parallel (which can be go faster through parallelism at
> > > > the single
> > > > > > > > > > > > file level) and devise a programming model in C++ that 
> > > > > > > > > > > > is
> > > > different
> > > > > > > > > > > > from what we are currently doing that results in 
> > > > > > > > > > > > superior
> > > > CPU
> > > > > > > > > > > > utilization.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > -Weston
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Sep 15, 2020 at 1:21 PM Wes McKinney <
> > > > wesmck...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > hi Weston,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > We've discussed some of these problems in the past 
> > > > > > > > > > > > > > --
> > > > I was
> > > > > > > > > > > > > > enumerating some of these issues to highlight the
> > > > problems that
> > > > > > > > > are
> > > > > > > > > > > > > > resulting from an absence of a richer programming
> > > > model for
> > > > > > > > > > > > > > hierarchical work scheduling. Parallel tasks
> > > > originating in each
> > > > > > > > > > > > > > workload are submitted to a global thread pool where
> > > > they are
> > > > > > > > > > > > > > commingled with the tasks coming from other
> > > workloads.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > As an example of how this can go wrong, suppose we
> > > > have a static
> > > > > > > > > > > > > > thread pool with 4 executors. If we submit 4
> > > > long-running tasks
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > > > pool, and then each of these tasks spawn additional
> > > > tasks that go
> > > > > > > > > > > into
> > > > > > > > > > > > > > the thread pool, a deadlock can occur, because the
> > > > thread pool
> > > > > > > > > thinks
> > > > > > > > > > > > > > that it's executing tasks when in fact those tasks
> > > are
> > > > waiting on
> > > > > > > > > > > > > > their dependent tasks to complete.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > A similar resource underutilization occurs when we 
> > > > > > > > > > > > > > do
> > > > > > > > > > > > > > pool->Submit(ReadFile), where ReadFile needs to do
> > > > some IO --
> > > > > > > > > from
> > > > > > > > > > > the
> > > > > > > > > > > > > > thread pool's perspective, the task is "working" 
> > > > > > > > > > > > > > even
> > > > though it
> > > > > > > > > may
> > > > > > > > > > > > > > wait for one or more IO calls to complete.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > In the Datasets API in C++ we have both of these
> > > > problems: file
> > > > > > > > > scan
> > > > > > > > > > > > > > tasks are being pushed onto the global thread pool,
> > > > and so to
> > > > > > > > > prevent
> > > > > > > > > > > > > > deadlocks multithreaded file parsing has been
> > > disabled.
> > > > > > > > > Additionally,
> > > > > > > > > > > > > > the scan tasks do IO, resulting in suboptimal
> > > > performance (the
> > > > > > > > > > > > > > problems caused by this will be especially
> > > exacerbated
> > > > when
> > > > > > > > > running
> > > > > > > > > > > > > > against slower filesystems like Amazon S3)
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Hopefully the issues are more clear.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks
> > > > > > > > > > > > > > Wes
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 2:57 PM Weston Pace <
> > > > > > > > > weston.p...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > It sounds like you are describing two problems.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1) Idleness - Tasks are holding threads in the
> > > > thread pool
> > > > > > > > > while
> > > > > > > > > > > they
> > > > > > > > > > > > > > > wait for IO or some long running non-CPU task to
> > > > complete.
> > > > > > > > > These
> > > > > > > > > > > > > > > threads are often in a "wait" state or something
> > > > similar.
> > > > > > > > > > > > > > > 2) Fairness - The ordering of tasks is causing
> > > short
> > > > tasks that
> > > > > > > > > > > could
> > > > > > > > > > > > > > > be completed quickly from being stuck behind 
> > > > > > > > > > > > > > > longer
> > > > term tasks.
> > > > > > > > > > > > > > > Fairness can be an issue even if all tasks are
> > > > always in the
> > > > > > > > > active
> > > > > > > > > > > > > > > state consuming CPU time.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Are both of these issues a problem?  Are you
> > > looking
> > > > to address
> > > > > > > > > > > both of them?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I doubt it's much help as it is probably a more
> > > > substantial
> > > > > > > > > change
> > > > > > > > > > > > > > > than what you were looking for but the popular
> > > > solution to #1
> > > > > > > > > these
> > > > > > > > > > > > > > > days seems to be moving toward non blocking IO 
> > > > > > > > > > > > > > > with
> > > > > > > > > > > > > > > promises/callbacks/async.  That way threads are
> > > > never in the
> > > > > > > > > > > waiting
> > > > > > > > > > > > > > > state (unless sitting idle in the pool).
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > -Weston
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Tue, Sep 15, 2020 at 7:00 AM Wes McKinney <
> > > > > > > > > wesmck...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > In light of ARROW-9924, I wanted to rekindle the
> > > > discussion
> > > > > > > > > > > about our
> > > > > > > > > > > > > > > > approach to multithreading (especially the
> > > > _programming
> > > > > > > > > model_)
> > > > > > > > > > > in
> > > > > > > > > > > > > > > > C++. We had some discussions about this about 6
> > > > months ago
> > > > > > > > > and
> > > > > > > > > > > there
> > > > > > > > > > > > > > > > were more discussions as I recall in summer 
> > > > > > > > > > > > > > > > 2019.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Realistically, we are going to be consistently
> > > > dealing with
> > > > > > > > > > > > > > > > independent concurrent in-process workloads that
> > > > each
> > > > > > > > > > > respectively can
> > > > > > > > > > > > > > > > go faster by multithreading. These could be
> > > things
> > > > like:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > * Reading file formats (CSV, Parquet, etc.) that
> > > > benefit from
> > > > > > > > > > > > > > > > multithreaded parsing/decoding
> > > > > > > > > > > > > > > > * Reading one or more files in parallel using 
> > > > > > > > > > > > > > > > the
> > > > Datasets
> > > > > > > > > API
> > > > > > > > > > > > > > > > * Executing any number of multithreaded
> > > analytical
> > > > workloads
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > One obvious issue with our thread scheduling is
> > > > the FIFO
> > > > > > > > > nature
> > > > > > > > > > > of the
> > > > > > > > > > > > > > > > global thread pool. If a new independent
> > > > multithreaded
> > > > > > > > > workload
> > > > > > > > > > > shows
> > > > > > > > > > > > > > > > up, it has to wait for other workloads to
> > > complete
> > > > before the
> > > > > > > > > > > new work
> > > > > > > > > > > > > > > > will be scheduled. Think about a Flight server
> > > > serving
> > > > > > > > > queries to
> > > > > > > > > > > > > > > > users -- is it fair for one query to "hog" the
> > > > thread pool
> > > > > > > > > and
> > > > > > > > > > > force
> > > > > > > > > > > > > > > > other requests to wait until they can get access
> > > > to some CPU
> > > > > > > > > > > > > > > > resources? You could imagine a workload that
> > > > spawns 10
> > > > > > > > > minutes
> > > > > > > > > > > worth
> > > > > > > > > > > > > > > > of CPU work, where a new workload has to wait 
> > > > > > > > > > > > > > > > for
> > > > all of that
> > > > > > > > > > > work to
> > > > > > > > > > > > > > > > complete before having any tasks scheduled for
> > > > execution.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > The approach that's been taken in the Datasets
> > > API
> > > > to avoid
> > > > > > > > > > > problems
> > > > > > > > > > > > > > > > with nested parallelism (file-specific 
> > > > > > > > > > > > > > > > operations
> > > > spawning
> > > > > > > > > > > multiple
> > > > > > > > > > > > > > > > tasks onto the global thread pool) is simply to
> > > > disable
> > > > > > > > > > > multithreading
> > > > > > > > > > > > > > > > at the level of a single file. This is clearly
> > > > suboptimal.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > We have additional problems in that some
> > > > file-loading related
> > > > > > > > > > > tasks do
> > > > > > > > > > > > > > > > a mixture of CPU work and IO work, and once a
> > > > thread has been
> > > > > > > > > > > > > > > > dispatched to execute one of these tasks, when 
> > > > > > > > > > > > > > > > IO
> > > > takes
> > > > > > > > > place, a
> > > > > > > > > > > CPU
> > > > > > > > > > > > > > > > core may sit underutilized while the IO is
> > > waiting.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > There's more aspects we can discuss, but in
> > > > general I think
> > > > > > > > > we
> > > > > > > > > > > need to
> > > > > > > > > > > > > > > > come up with a programming model for building 
> > > > > > > > > > > > > > > > our
> > > > C++ system
> > > > > > > > > > > > > > > > components with the following requirements:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > * Deadlocks not possible by design
> > > > > > > > > > > > > > > > * Any component can safely use "nested
> > > > parallelism" without
> > > > > > > > > the
> > > > > > > > > > > > > > > > programmer having to worry about deadlocks or 
> > > > > > > > > > > > > > > > one
> > > > task
> > > > > > > > > "hogging"
> > > > > > > > > > > the
> > > > > > > > > > > > > > > > thread pool. So in other words, if there's only 
> > > > > > > > > > > > > > > > a
> > > > single
> > > > > > > > > > > > > > > > multithreading-capable workload running, we "let
> > > > it rip"
> > > > > > > > > > > > > > > > * Resources can be reasonably fairly allocated
> > > > amongst
> > > > > > > > > concurrent
> > > > > > > > > > > > > > > > workloads (think: independent requests coming in
> > > > through
> > > > > > > > > Flight,
> > > > > > > > > > > or
> > > > > > > > > > > > > > > > scan tasks on different Parquet files in the
> > > > Datasets API).
> > > > > > > > > Limit
> > > > > > > > > > > > > > > > scenarios where a new workload is blocked
> > > > altogether on the
> > > > > > > > > > > completion
> > > > > > > > > > > > > > > > of other workloads
> > > > > > > > > > > > > > > > * A well-defined programming pattern for tasks
> > > > that do a
> > > > > > > > > mixture
> > > > > > > > > > > of
> > > > > > > > > > > > > > > > CPU work and IO work that allows CPU cores to be
> > > > used when a
> > > > > > > > > > > task is
> > > > > > > > > > > > > > > > waiting on IO
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > We can't be the only project that has these
> > > > problems, so I'm
> > > > > > > > > > > > > > > > interested to see what solutions have been
> > > > successfully
> > > > > > > > > employed
> > > > > > > > > > > by
> > > > > > > > > > > > > > > > others. For example, it strikes me as similar to
> > > > concurrency
> > > > > > > > > > > issues
> > > > > > > > > > > > > > > > inside an analytic database. How are they
> > > > preventing
> > > > > > > > > concurrent
> > > > > > > > > > > > > > > > workload starvation problems or handling CPU/IO
> > > > task
> > > > > > > > > scheduling
> > > > > > > > > > > to
> > > > > > > > > > > > > > > > avoid CPU underutilization?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Choices of which threading libraries we might 
> > > > > > > > > > > > > > > > use
> > > > to
> > > > > > > > > implement a
> > > > > > > > > > > > > > > > viable solution (e.g. TBB) seem secondary to the
> > > > programming
> > > > > > > > > > > model
> > > > > > > > > > > > > > > > that we use to implement our components.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > Wes
> > > > > > > > > > >
> > > > > > > > >
> > > >
> > >

Reply via email to