Replied inline. On Tue, Oct 6, 2020 at 6:07 AM Sergey Oboguev <obog...@gmail.com> wrote:
> Hi Jungtaek, > > Thanks for your response. > > *> you'd want to dive inside the checkpoint directory and have separate > numbers per top-subdirectory* > > All the checkpoint store numbers are solely for the subdirectory set by > option("checkpointLocation", .. checkpoint dir for writer ... ) > > > Other subdirectories are empty or nearly-empty. > I meant the subdirectory inside the directory you're providing as "checkpointLocation", as there're several directories in that directory, and they exist for different purposes. It'd be nice if we can determine whether the issue is all around these directories or specific to a directory. > > *> First of all, you'd want to divide these numbers by the number of > micro-batches, as file creations in checkpoint directory would occur > similarly per micro-batch* > > There were 69 microbatches, each containing 13,334 events. > > Event's Row object size in Java heap is 620 bytes, thus the total amount > of data in a microbatch (in terms of aggregate Java-heap objects sizes) is > 8.3 MB. > > Average number of system calls per microbatch was: > > For query (writer) checkpoint directory: > > create/open file = 877 > mkdir = 826 > readlink = 855 > unlink = 607 > rename = 213 > execve readlink = 5116 > execve chmod = 4937 > > For Spark local directory: > > create/open file = 797 > unlink = 200 > mmap = 197 > stat = 2391 > > (The number for local.stat in the previous message was incorrect). > > Physical processing time per microbatch was 3.4 seconds. > > That's to store a mere 8.3 MB of uncompressed (Java-heap) data! > > Most created "delta" files have file size in the order of 1 KB or less. > "Snapshot" files are several KB in size. > One would think that the tiny size of created files is one key factor in > dismal performance. It causes a very high number of system calls and also > hugely fragments actual data IO. > The size of the delta file heavily depends on your stateful operation and data in each micro-batch. delta file only captures the "changes" of state in specific micro-batch, so there're cases you'll have very tiny delta files, e.g. cardinality of grouped key is small (hence cardinality of KVs is also small), small amount of inputs are provided per micro-batch, the overall size of aggregated row is small, there's skew on grouped key (hence some partitions get no input or small inputs), etc. The snapshot files will be getting bigger as it contains all of the state KVs at the specific micro-batch, so you may not want to worry about that being small. > > As a result, using iostat, typical disk write rate was observed only ~ 100 > KB/s. > (Read rate was near-zero, presumably because all data was in Linux block > cache.) > > Average CPU usage when ingesting data was in the order of 600% (i.e. 6 > cores busy), I presume chiefly for serialization/deserialization, even > though Kryo was enabled. But the machine has 16 cores (VCPUs), so the most > immediate limiting factor must have been not CPU saturation but IO latency > (unless there is some obscure setting limiting the number of > reading/writing threads). The latency arising, fundamentally, out of very > large number of tiny files. > > Is there a way to control the size of checkpoint "delta" and "snapshot" > files Spark creates, to make them larger? > Unfortunately no - it's used for fault-tolerance guarantee (stateful exactly-once) per micro-batch. All stateful operations should write a delta file per shuffle partition (spark.sql.shuffle.partitions) per micro-batch. The default value of shuffle partitions is 200, hence in each microbatch the query will create 200 files for each state store by default. (You can reduce this value from the start of the streaming query, so that's a thing you can tweak.) In reality, you still need to multiply it by 4, as there's also a crc file per file if HDFS API picks the filesystem as checksum file system, as well as Spark creates two files (read/write) for streaming aggregation. (I hope SPARK-30294 would address it - after that we no longer need to multiply by 2 because of read/write purpose.) The only way for now to have less small files is increasing the interval of micro-batch, which may bring another concern, batch size (and output size) and output latency. That is a downside compared to what streaming frameworks provide - in streaming frameworks, having a longer interval of checkpoint only affects the amount of data to restore when failing. If you expect end-to-end exactly-once then output latency is also affected, but it's an option end users can tolerate. Probably micro-batch could also decouple micro-batch interval and checkpoint interval to provide flexibility, say, I can tolerate reprocessing up to 10 mins of data being processed when fail occurs, but due to the output latency I should have micro-batch interval as 30 seconds. (In other words, do a checkpoint per around 20 micro-batches.) That is a bit tricky to implement actually, and also I don't see any request for this so that is just a sketched idea. And the same also for the files in Spark local directory? > > * * * > > The numbers for checkpoint directory are, of course, captured when it was > set to a local drive (or Lustre/NFS.). > > For HDFS there are obviously no local file system calls for the checkpoint > store, as HDFS does not present itself as an OS-level file system. > Nevertheless the name of checkpoint directory was transmitted over HDFS > connection socket 1,675 times per microbatch, so the number of high-level > HDFS file operations must have been at least that high. > > * * * > > On a related note, for 920,000 events Spark made 700,000 attempts to > execute chmod or readlink program, i.e. to launch an external subprocess > with an executable in order to perform a file operation. Those 900,000 > attempts actually represent 150,000 cycles, and in each cycle Spark tried > to launch the program from 6 different locations (/usr/local/sbin -> > /usr/local/bin -> /usr/sbin -> /usr/bin -> /sbin -> /bin), until it > finally finds it in the last. But then on the next cycle Spark/Hadoop does > not re-use the knowledge of a previously found utility location, and > repeats the search from the very start causing useless file system search > operations over and over again. > > This may or may not matter when HDFS is used for checkpoint store > (depending on how HDFS server implements the calls), but it does matter > when a file system like Lustre or NFS is used for checkpoint storage. > (Not to mention spawning readlink and chmod does not seem like a bright > idea in the first place, although perhaps there might be a reason why > Hadoop layer does it this way). > > Thanks, > Sergey > > On Mon, Oct 5, 2020 at 5:45 AM Jungtaek Lim <kabhwan.opensou...@gmail.com> > wrote: > >> First of all, you'd want to divide these numbers by the number of >> micro-batches, as file creations in checkpoint directory would occur >> similarly per micro-batch. >> Second, you'd want to dive inside the checkpoint directory and have >> separate numbers per top-subdirectory. >> >> After that we can see whether the value would make sense or not. >> >> Regarding file I/O issues on SS, two issues I know about are: >> 1) If you use streaming aggregation, it unnecessarily creates a temporary >> file for both read and write on the state store, while the file is only >> needed for writing. That makes the number of file creations to be 2x. The >> patch is proposed under SPARK-30294 [1]. >> >> 2) Spark leverages HDFS API which is configured to create crc file per >> file by default. (So you'll have 2x files than expected.) There's a bug in >> HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename >> (in short of how checkpoint works in SS, temp file is atomically renamed to >> be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to >> delete the crc file which two additional operations (exist -> delete) may >> occur per crc file. >> >> Hope this helps. >> >> Thanks, >> Jungtaek Lim (HeartSaVioR) >> >> 1. https://issues.apache.org/jira/browse/SPARK-30294 >> 2. https://issues.apache.org/jira/browse/HADOOP-16255 >> 3. https://issues.apache.org/jira/browse/SPARK-28025 >> >> >> On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <obog...@gmail.com> wrote: >> >>> I am trying to run a Spark structured streaming program simulating basic >>> scenario of ingesting events and calculating aggregates on a window with >>> watermark, and I am observing an inordinate amount of disk IO Spark >>> performs. >>> >>> The basic structure of the program is like this: >>> >>> sparkSession = SparkSession.builder() >>> .appName(....) >>> .master("local[*]") >>> .config("spark.executor.memory", "8g") >>> .config("spark.serializer", >>> "org.apache.spark.serializer.KryoSerializer") >>> .config("spark.kryoserializer.buffer", "8m") >>> .config("spark.local.dir", ...local >>> directory...) >>> .getOrCreate(); >>> >>> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the >>> app ...); >>> >>> dataset = sparkSession.readStream() >>> .option("checkpointLocation", ... checkpoint dir >>> for source ...) >>> .format(MockStreamingSource.class.getName()) >>> .load(); >>> >>> Dataset<Row> ds = dataset >>> .withWatermark("timestamp", "10 minutes") >>> .groupBy( >>> >>> functions.window(functions.col("timestamp"), "2 minutes"), >>> functions.col("source")) >>> .agg( >>> functions.avg("D0").as("AVG_D0"), >>> functions.avg("I0").as("AVG_I0")); >>> >>> DataStreamWriter<Row> dsw = ds.writeStream() >>> // .trigger(Trigger.ProcessingTime("1 >>> minute")) >>> .option("checkpointLocation", .. >>> checkpoint dir for writer ... ); >>> >>> dsw.outputMode(OutputMode.Append()) >>> .format("console") >>> .option("truncate", "false") >>> .option("numRows", Integer.MAX_VALUE) >>> .start() >>> .awaitTermination(); >>> >>> >>> MockStreamingSource is just that -- a source intended to provide a >>> simulated input. It generates microbatches of mock events and sends them to >>> the app. In the testing scenario, the source simulates 20,000 devices each >>> sending an event every 15 seconds for 11.5 minutes of logical time (just >>> under 12 minutes of window size + watermark), for a total number of 920,000 >>> events. >>> >>> I initially started with microbatch sized to 500 events, and processing >>> performance was totally dismal because of disk IO. I then increased >>> microbatch size and performance got better, but still very poor. Microbatch >>> size now is 13,334 events per batch, this corresponds to ingestion interval >>> of 10 seconds. Smaller batches resulted in worse performance. >>> >>> But even with microbatch sized 13,334 event performance is poor because >>> of excessive disk IO generated by Spark. >>> Just ingesting data generated intra-app takes the program physical time >>> equal to 40% of window size + watermark. >>> >>> Using strace, I measured that checkpoint directory for the stream writer >>> receives the following number of Linux system calls: >>> >>> create/open file = 60,500 calls >>> mkdir = 57,000 >>> readlink = 59,000 >>> unlink = 41,900 >>> rename = 14,700 >>> execve readlink=353,000 (incl. repetitive searches of readlink >>> executable in 6 different locations) >>> execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 >>> different locations) >>> >>> In addition, Spark local directory received: >>> >>> create/open file = 55,000 calls >>> unlink = 13,800 >>> stat = 42,000 >>> >>> That's for mere 920,000 of small events (each event Row is 600 bytes >>> when in Java heap). >>> >>> I also tried trigger(...) to see whether it can improve anything, but it >>> just made things worse. >>> >>> Spark version 2.4.6. >>> >>> Is this an expected amount of disk IO for Spark, or am I doing something >>> wrong and there is a way to avoid Spark generating such an amount of disk >>> IO? >>> >>