I can't spend too much time on explaining one by one. I strongly encourage
you to do a deep-dive instead of just looking around as you want to know
about "details" - that's how open source works.

I'll go through a general explanation instead of replying inline; probably
I'd write a blog doc if there's no existing doc (I guess there should be
one) instead of putting too much time here.

In short, the reason Spark has to create these files "per micro-batch" is
to ensure fault-tolerance. For example, If the query fails at batch 5 and
you rerun the query, it should rerun batch 5. How?

Spark should be aware the offsets the query has been read for batch 4,
preferably the offsets the query read for batch 5. They're offsets/commits.
State is for storing accumulated values on stateful operations. Same here -
Spark should be able to read the state for batch 4 so that it can calculate
the new accumulated values for batch 5. In addition, partition means max
parallelism (they aren't aware of each other and they shouldn't), hence the
state for partition should be stored individually.

Storing 4 files (in the end we'll only have "2" files, but here I count
temp files with crc files, as we are talking about performance aspect) per
partition per micro-batch is the thing I already explained - I agree it's
not ideal, e.g. I submitted the PR for SPARK-30294 [1] which reduces the
number of files by half. Probably we could propose Hadoop to skip creating
CRC files (I'm not sure it can be simply done as of now), but Spark
is conservative about upgrading the versions for dependencies so it might
not be available soon even if we address it right away.

As you've found here it's super important to find the right value of
shuffle partitions. It's partitioned by hash function, so it strongly
depends on the group key. If the cardinality of group key is low, probably
the right value of shuffle partitions should be fairly small. Unfortunately
once the query runs you can't change the value of shuffle partitions, as
Spark doesn't have the feature of state migration once the number of
partitions change. Either you need to predict the overall cardinality at
specific time and set the right value, or try to use a 3rd party state
tool. [2] (DISCLAIMER: I'm the author.)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://github.com/HeartSaVioR/spark-state-tools


On Wed, Oct 7, 2020 at 11:16 PM Sergey Oboguev <obog...@gmail.com> wrote:

> Hi Jungtaek,
>
> *> I meant the subdirectory inside the directory you're providing as
> "checkpointLocation", as there're several directories in that directory...*
>
> There are two:
>
> *my-spark-checkpoint-dir/MainApp*
> created by sparkSession.sparkContext().setCheckpointDir(<checkpoint dir
> for the app>)
> contains only empty subdir with GUID name
>
> *my-spark-checkpoint-dir/writer*
> created by ds.writeStream().option("checkpointLocation", <checkpoint dir
> for writer>)
> contains all the files
>
> Within the latter ("writer") there are four subdirectories: commits,
> metadata, offsets, state.
>
> Breakdown of file creations within them, per 69 microbatches (when shuffle
> partition count = 200) is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 56232
>
> (Creation is identified by strace record for "openat" system call with
> O_CREAT flag and file path in the corresponding directory.)
>
> When shuffle partition count is 10, breakdown of file creations within
> them, per 69 microbatches, is:
>
> commits = 136
> metadata = 0
> offsets = 138
> state = 2760
>
> *> The size of the delta file heavily depends on your stateful operation
> and data in each micro-batch. delta file only captures the "changes" of
> state in specific micro-batch, so there're cases you'll have very tiny
> delta files, e.g. cardinality of grouped key is small (hence cardinality of
> KVs is also small), small amount of inputs are provided per micro-batch,
> the overall size of aggregated row is small, there's skew on grouped key
> (hence some partitions get no input or small inputs), etc.*
>
>
> In my case there is no key in the Row object (unless the bucketized
> "timestamp" for 2-min windows buckets becomes a key), and the microbatch is
> large enough: the whole problem is that Spark does not want to save the
> microbatch as a single file. Even after I reduce the number of shuffle
> partitions (see below), the number of files per microbatch remains
> significantly larger than the number of shuffle partitions.
>
> ..........
>
> When the number of shuffle partitions is 200, Spark creates 816 files (per
> microbatch) in checkpoint store and 202 files in Spark local-dir.
>
> Of checkpoint files: 24 per microbatch are snapshot files, and 788 are
> delta files.
> The same per microbatch per partition: 0.12 snapshot files, 4 delta files.
> Of local-dir files: 200 temp_shuffle files per microbatch (as expected)
> and 2 other files (shuffle.data+shuffle.index).
>
> If I reduce the number of shuffle partitions, two things happen:
> - Throughput of a single pipeline improves.
> - CPU usage by the pipeline is reduced (allowing a single node to co-run
> larger number of pipelines).
> Most of the improvements are gained by the time the number of partitions
> is reduced to 5-10.
> Going below that, further improvements are marginal.
>
> When reducing the number of shuffle partitions from 200 to 10, physical
> latency of data ingestion into the checkpoint is reduced 1.9 times, and CPU
> usage is reduced 2.6 times.
>
> When reducing the number of shuffle partitions from 200 to 5, physical
> latency of data ingestion into the checkpoint is reduced 2.1 times, and CPU
> usage is reduced 4.5 times.
>
> Still, latency remains high, because the number of created files per
> microbatch remains high.
>
> ..........
>
> With 5 shuffle partitions, Spark creates 23.9 files (per microbatch) in
> checkpoint store and 6.9 files in Spark local-dir.
> Of checkpoint files: 0.15 per microbatch are snapshot files, and 19.7 are
> delta files.
> Of local-dir files: 4.93 temp_shuffle files per microbatch (as expected)
> and 2 other files (shuffle.data+shuffle.index).
>
> Why would Spark need to create 20 delta files per microbatch, or to put it
> another way: 4 delta files per microbatch per shuffle partition?
>
> One could try to guess this could be due to changing "timestamp", but this
> does not bear out. In my produced stream (69 microbatches) there are only
> 46 distinct values for timestamp, consecutively increasing from first
> timestamp to last. Thus lots of microbatches will have just one timestamp
> value. On average, microbatch will have 1.5 distinct timestamp values. But
> it would, of course, be terribly wrong for Spark to use raw timestamp value
> as a key, as in real world almost every event would have a unique
> timestamp, so the number of files required for saving by timestamp as a key
> would be insane, hopefully Spark does not attempt to do that. But perhaps
> it may use the index of 2-minute window bucket as a key. If so, there are
> only 6 distinct values per the whole event set (I have window size set to 2
> minutes, watermark 10 minutes, and event set spans 11.5 minutes). Thus, 90%
> of microbatches will fall wholly in just one window bucket, and 10% in two
> buckets. So why 4 delta files per microbatch per shuffle partition?
>
> ..........
>
> For the completeness of the picture, if I run the test with the shuffle
> partition count set to 1, then:
> Spark creates 8 files (per microbatch) in the checkpoint store and 3 files
> in Spark local-dir.
> Of checkpoint files: 0.03 per microbatch are snapshot files (only 2
> snapshot files in the whole run), and 4 delta files per microbatch.
> Of local-dir files: 1 temp_shuffle files per microbatch (as expected) and
> 2 other files (shuffle.data+shuffle.index).
>
> ..........
>
> Thus Spark SS seems to keep 4 delta files per microbatch per shuffle
> partition, no matter what is the number of shuffle partitions.
> Why would it have to do this?
>
> Also unsure why Spark has to create local-dir files per every microbatch,
> rather than keeping them open across microbatches and re-using from one
> microbatch to another (writing data over, but without having to go through
> file creation)...
>
> *> Spark leverages HDFS API which is configured to create crc file per
> file by default. *
>
>
> This is unfortunate. Much better would be to create checkpoint files with
> HDFS CRC "shadow file" disabled, and having instead CRC (if desired) right
> inside the main file itself, rather than as a separate file.
>
> * * *
>
> While we are at it, I wished to ask a related question. Suppose I create
> multiple parallel streaming pipelines in the applications, by pipeline
> meaning the whole data stream from initial Dataset/DatasetReader to the
> output of Spark query. Suppose I have multiple parallel pipelines in the
> application, a large number, let us say dozens or hundreds.
>
> How would Spark process them in terms of threading model? Will there be a
> separate thread per active stream/query or does Spark use a bounded thread
> pool? Do many streams/queries result in many threads, or a limited number
> of threads?
>
> Thanks,
> Sergey
>

Reply via email to