Thanks.  This is a very helpful reproduction.

I was able to reproduce and diagnose the problem.  There is a bug on our
end and I have filed [1] to address it.  There are a lot more details in
the ticket if you are interested.  In the meantime, the only workaround I
can think of is probably to slow down the data source enough that the queue
doesn't fill up.

[1] https://github.com/apache/arrow/issues/36951


On Sun, Jul 30, 2023 at 8:15 PM Wenbo Hu <huwenbo1...@gmail.com> wrote:

> Hi,
>     The following code should reproduce the problem.
>
> ```
>
> import pyarrow as pa
> import pyarrow.fs, pyarrow.dataset
>
> schema = pa.schema([("id", pa.utf8()), ("bucket", pa.uint8())])
>
>
> def rb_generator(buckets, rows, batches):
>     batch = pa.record_batch(
>         [[f"id-{i}" for i in range(rows)], [i % buckets for i in
> range(rows)]],
>         schema=schema,
>     )
>
>     for i in range(batches):
>         yield batch
>         print(f"yielding {i}")
>
>
> if __name__ == "__main__":
>     pa.set_io_thread_count(1)
>     reader = pa.RecordBatchReader.from_batches(schema,
> rb_generator(64, 32768, 1000000))
>     local_fs = pa.fs.LocalFileSystem()
>
>     pa.dataset.write_dataset(
>         reader,
>         "/tmp/data_f",
>         format="feather",
>         partitioning=["bucket"],
>         filesystem=local_fs,
>         existing_data_behavior="overwrite_or_ignore"
>     )
>
> ```
>
> Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月30日周日 15:30写道:
> >
> > Hi,
> >    Then another question is that "why back pressure not working on the
> > input stream of write_dataset api?". If back pressure happens on the
> > end of the acero stream for some reason (on queue stage or write
> > stage), then the input stream should backpressure as well? It should
> > keep the memory to a stable level so that the input speed would match
> > the output speed.
> >
> >     Then, I made some other experiments with various io_thread_count
> > values and bucket_size (partitions/opening files).
> >
> > 1. for bucket_size to 64 and io_thread_count/cpu_count to 1, the cpu
> > is up to 100% after transferring done, but there is a very interesting
> > output.
> >     * flow transferring from client to server at the very first few
> > batches are slow, less than 0.01M rows/s, then it speeds up to over 4M
> > rows/s very quickly.
> >     * I think at the very beginning stage, the backpressure works
> > fine, until sometime, like the previous experiments, the backpressure
> > makes the stream into a blackhole, then the io thread input stream
> > stuck at some slow speed. (It's still writing, but takes a lot of time
> > on waiting upstream CPU partitioning threads to push batches?)
> >     * from iotop, the total disk write is dropping down very slowly
> > after transferring done. But it may change over different experiments
> > with the same configuration. I think the upstream backpressure is not
> > working as expected, which makes the downstream writing keep querying.
> > I think it may reveal something, maybe at some point, the slow writing
> > enlarge the backpressure on the whole process (the write speed is
> > dropping slowly), but the slow reason of writing is the upstream is
> > already slow down.
> >
> > 2. Then I set cpu_count to 64
> > 2.1 io_thread_count to 4.
> > 2.1.1 , for bucket_size to 2/4/6, The system works fine. CPU is less
> > than 100%. Backpressure works fine, memory will not accumulated much
> > before the flow speed becomes stable.
> > 2.1.2  when bucket_size to 8, the bug comes back. After transferring
> > done, CPU is about 350% (only io thread is running?) and write from
> > iotop is about 40M/s, then dropping down very slowly.
> >
> > 2.2. then I set both io_thread to 6,
> > 2.2.1 for bucket_size to 6/8/16, The system works fine. CPU is about
> > 100%. like 2.1.1
> > 2.2.2 for bucket_size to 32, the bug comes back. CPU halts at 550%.
> >
> > 2.3 io_thread_count to 8
> > 2.3.1 for bucket_size to 16, it fails somehow. After transferring
> > done, the memory accumulated over 3G, but write speed is about 60M/s,
> > which makes it possible to wait. CPU is about 600~700%. After the
> > accumulated memory writing, CPU becomes normal.
> > 2.3.2 for bucket_size to 32, it still fails. CPU halts at 800%.
> > transferring is very fast (over 14M rows/s). the backpressure is not
> > working at all.
> >
> >
> > Weston Pace <weston.p...@gmail.com> 于2023年7月29日周六 01:08写道:
> > >
> > > > How many io threads are writing concurrently in a single
> write_dataset
> > > > call?
> > >
> > > With the default options, and no partitioning, it will only use 1 I/O
> > > thread.  This is because we do not write to a single file in parallel.
> > > If you change FileSystemDatasetWriteOptions::max_rows_per_file then
> you may
> > > see more than 1 I/O thread because we will start new files and write to
> > > each one in parallel.
> > > If you have partitioning then you may see more than 1 I/O thread
> because we
> > > will be writing to multiple files.
> > >
> > > We use, at most, 1 I/O thread per file being written.
> > >
> > > > How do they schedule?
> > >
> > > There are 3 stages.
> > >
> > > Arrival stage: Data arrives on an Acero worker thread (CPU thread).  We
> > > will partition the data at this point.  For each batch we schedule a
> Queue
> > > Batch task.
> > > Queue stage: This stage runs on the CPU thread.  It finds the correct
> file
> > > queue for the batch and adds the batch to the file queue.  It may
> split the
> > > batch if max_rows_per_file is set.  It may trigger backpressure if
> there
> > > are too many rows queued on files.  This stage runs serially, on the
> CPU
> > > thread.  There is never more than one queue task running.
> > > Write stage: Each file has a number of write tasks.  These run in
> parallel
> > > across files but serially within a file.  These are I/O tasks.
> > >
> > > > The throttle code seems only one task got running?
> > >
> > > Yes, there is a throttled scheduler used for the queue stage (we only
> run
> > > one queue task at a time).  There is a throttled scheduler per file
> used
> > > for the write stage.  All of these are configured to only allow one
> task at
> > > a time to run.
> > >
> > > > What else can I do to inspect the problem?
> > >
> > > I think we need to find out why the CPU is still 800% after the
> transfer is
> > > done when partitioning is enabled.  I would expect the CPU to drop to
> 0%
> > > even if it takes several seconds (or longer) for the cached data to
> flush
> > > to the disk.  The strack trace you shared is helpful but I don't know
> the
> > > root cause yet.  All of the threads are stuck on locking / unlocking in
> > > FutureImpl::TryAddCallback but that critical section is very small.
> So it
> > > seems like there is some kind of task storm.  I think this is similar
> to a
> > > condition_variable that has thousands of waiters and is constantly
> doing a
> > > notify_all.
> > >
> > > I think we will need to figure out some kind of reproducible test
> case.  I
> > > will try and find some time to run some experiments on Monday.  Maybe
> I can
> > > reproduce this by setting the backpressure limit to a very small
> amount.
> > >
> > > On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com>
> wrote:
> > >
> > > > Hi,
> > > >    Thanks for your detailed explanation, I made some experiment
> today.
> > > >
> > > > Before experiment,
> > > > 1. To limit the resources used by the server, I use docker, which
> uses
> > > > cgroups. But "free" does not respect the resource limit inside the
> > > > container.
> > > > 2. I measured the write speed on the host by "dd if=/dev/zero
> > > > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080
> > > > Byets(48 GB) Copies,132.558 s,365 MB/s"
> > > > 3. I do not limit the memory of the container ( available over
> 140GB),
> > > > but the CPU is still limit to 8. According to you explanation,  the
> > > > write process should never slow down, since it will write to the
> > > > "memory cached" which is accounted as used memory until it is flushed
> > > > to the storage by the OS.
> > > >
> > > > Additionally, the file format is "feather", writing with/without
> > > > partitioning leads to different result.
> > > >
> > > > ## write without partitioning
> > > > Everything works fine, no matter what value I set to io_thread_count
> > > > or cpu_count.
> > > > the performance of same configuration varies a lot, the initial peak
> > > > speed may result in different memory usage, but the max average flow
> > > > speed not varies a lot.
> > > > Some records are below,
> > > > 1. With cpu_count and io_thread_count to 128 CPU is less than 100%
> and
> > > > RES is less than 1G (after initial peak speed), average flow speed is
> > > > 6.83M rows/s (45bytes per row).
> > > > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over
> > > > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but
> > > > it takes additional 6s to complete writing after transferring done.
> > > > 3. With cpu_count to 1, io_thread_count to 128, performs almost as
> > > > same as record 2.
> > > >
> > > > ## write with partitioning
> > > > Writing with partitioning fails most of the time, setting lower cpu
> > > > count not helping.
> > > >  1. With cpu_count and io_thread_count to 128, CPU is 800% from
> > > > begining, RES is growing slowing to 40.7G to the end of transferring,
> > > > average flow speed is 3.24M rows/s. After that, CPU is still 800%,
> but
> > > > RES going down very slow at 200MB/minute. Write speed not recovered.
> > > >  2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800%
> > > > slower than record1, RES is growing to 44.1G to the end of
> > > > transferring, average flow speed is 6.75M rows/s. Same happens as
> > > > record 1 after transferring done.
> > > > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much
> > > > slower than record2 (due to slower flow speed?), RES is growing to
> 30G
> > > > to the end of transferring, average flow speed is 1.62M rows/s. Same
> > > > happens as record 1 after transferring done.
> > > >
> > > > Then I'm trying to limit the flow speed before writing queue got full
> > > > with custom flow control (sleep on reader iteration based on
> available
> > > > memory) But the sleep time curve is not accurate, sometimes flow
> slows
> > > > down, but the queue got full anyway.
> > > > Then the interesting thing happens, before the queue is full (memory
> > > > quickly grows up), the CPU is not fully used. When memory grows up
> > > > quickly, CPU goes up as well, to 800%.
> > > > 1. Sometimes, the writing queue can overcome, CPU will goes down
> after
> > > > the memory accumulated. The writing speed recoved and memory back to
> > > > normal.
> > > > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes
> > > > down after that.
> > > >
> > > > How many io threads are writing concurrently in a single
> write_dataset
> > > > call? How do they schedule? The throttle code seems only one task got
> > > > running?
> > > > What else can I do to inspect the problem?
> > > >
> > > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道:
> > > > >
> > > > > You'll need to measure more but generally the bottleneck for
> writes is
> > > > > usually going to be the disk itself.  Unfortunately, standard OS
> buffered
> > > > > I/O has some pretty negative behaviors in this case.  First I'll
> describe
> > > > > what I generally see happen (the last time I profiled this was a
> while
> > > > back
> > > > > but I don't think anything substantial has changed).
> > > > >
> > > > > * Initially, writes are very fast.  The OS `write` call is simply a
> > > > memcpy
> > > > > from user space into kernel space.  The actual flushing the data
> from
> > > > > kernel space to disk happens asynchronously unless you are using
> direct
> > > > I/O
> > > > > (which is not currently supported).
> > > > > * Over time, assuming the data arrival rate is faster than the
> data write
> > > > > rate, the data will accumulate in kernel memory.  For example, if
> you
> > > > > continuously run the Linux `free` program you will see the `free`
> column
> > > > > decrease and the `buff/cache` column decreases.  The `available`
> column
> > > > > should generally stay consistent (kernel memory that is in use but
> can
> > > > > technically be flushed to disk if needed is still considered
> "available"
> > > > > but not "free")
> > > > > * Once the `free` column reaches 0 then a few things happen.
> First, the
> > > > > calls to `write` are no longer fast (the write cannot complete
> until some
> > > > > existing data has been flushed to disk).  Second, other processes
> that
> > > > > aren't in use might start swapping their data to disk (you will
> generally
> > > > > see the entire system, if it is interactive, grind to a halt).
> Third, if
> > > > > you have an OOM-killer active, it may start to kill running
> applications.
> > > > > It isn't supposed to do so but there are sometimes bugs[1].
> > > > > * Assuming the OOM killer does not kill your application then,
> because
> > > > the
> > > > > `write` calls slow down, the number of rows in the dataset
> writer's queue
> > > > > will start to fill up (this is captured by the variable
> > > > > `rows_in_flight_throttle`.
> > > > > * Once the rows_in_flight_throttle is full it will pause and the
> dataset
> > > > > writer will return an unfinished future (asking the caller to back
> off).
> > > > > * Once this happens the caller will apply backpressure (if being
> used in
> > > > > Acero) which will pause the reader.  This backpressure is not
> instant and
> > > > > generally each running CPU thread still delivers whatever batch it
> is
> > > > > working on.  These batches essentially get added to an asynchronous
> > > > > condition variable waiting on the dataset writer queue to free
> up.  This
> > > > is
> > > > > the spot where the ThrottledAsyncTaskScheduler is used.
> > > > >
> > > > > The stack dump that you reported is not exactly what I would have
> > > > expected
> > > > > but it might still match the above description.  At this point I
> am just
> > > > > sort of guessing.  When the dataset writer frees up enough to
> receive
> > > > > another batch it will do what is effectively a "notify all" and
> all of
> > > > the
> > > > > compute threads are waking up and trying to add their batch to the
> > > > dataset
> > > > > writer.  One of these gets through, gets added to the dataset
> writer, and
> > > > > then backpressure is applied again and all the requests pile up
> once
> > > > > again.  It's possible that a "resume sending" signal is sent and
> this
> > > > might
> > > > > actually lead to RAM filling up more.  We could probably mitigate
> this by
> > > > > adding a low water mark to the dataset writer's backpressure
> throttle (so
> > > > > it doesn't send the resume signal as soon as the queue has room
> but waits
> > > > > until the queue is half full).
> > > > >
> > > > > I'd recommend watching the output of `free` (or monitoring memory
> in some
> > > > > other way) and verifying the above.  I'd also suggest lowering the
> number
> > > > > of CPU threads and see how that affects performance.  If you lower
> the
> > > > CPU
> > > > > threads enough then you should eventually get it to a point where
> your
> > > > > supply of data is slower then your writer and I wouldn't expect
> memory to
> > > > > accumulate.  These things are solutions but might give us more
> clues into
> > > > > what is happening.
> > > > >
> > > > > [1]
> > > > >
> > > >
> https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used
> > > > >
> > > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com>
> wrote:
> > > > >
> > > > > > Hi,
> > > > > >     I'm using flight to receive streams from client and write to
> the
> > > > > > storage with python `pa.dataset.write_dataset` API. The whole
> data is
> > > > > > 1 Billion rows, over 40GB with one partition column ranges from
> 0~63.
> > > > > > The container runs at 8-cores CPU and 4GB ram resources.
> > > > > >     It can be done about 160s (6M rows/s, each record batch is
> about
> > > > > > 32K rows) for completing transferring and writing almost
> > > > > > synchronously, after setting 128 for io_thread_count.
> > > > > >      Then I'd like to find out the bottleneck of the system, CPU
> or
> > > > > > RAM or storage?
> > > > > >     1. I extend the ram into 32GB, then the server consumes more
> ram,
> > > > > > the writing progress works at the beginning, then suddenly slow
> down
> > > > > > and the data accumulated into ram until OOM.
> > > > > >     2. Then I set the ram to 64GB, so that the server will not
> killed
> > > > > > by OOM. Same happens, also, after all the data is transferred (in
> > > > > > memory), the server consumes all CPU shares (800%), but still
> very
> > > > > > slow on writing (not totally stopped, but about 100MB/minute).
> > > > > >     2.1 I'm wondering if the io thread is stuck, or the
> computation
> > > > > > task is stuck. After setting both io_thread_count and cpu_count
> to 32,
> > > > > > I wrapped the input stream of write_dataset with a callback on
> each
> > > > > > record batch, I can tell that all the record batches are
> consumed into
> > > > > > write_dataset API.
> > > > > >     2.2 I dumped all threads stack traces and grab a flamegraph.
> See
> > > > > >
> https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb.
> > > > > >
> > > > > >      It seems that all threads stucks at
> > > > ThrottledAsyncTaskSchedulerImpl.
> > > > > >
> > > > > > --
> > > > > > ---------------------
> > > > > > Best Regards,
> > > > > > Wenbo Hu,
> > > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > ---------------------
> > > > Best Regards,
> > > > Wenbo Hu,
> > > >
> >
> >
> >
> > --
> > ---------------------
> > Best Regards,
> > Wenbo Hu,
>
>
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,
>

Reply via email to