First of all, you'd want to divide these numbers by the number of micro-batches, as file creations in checkpoint directory would occur similarly per micro-batch. Second, you'd want to dive inside the checkpoint directory and have separate numbers per top-subdirectory.
After that we can see whether the value would make sense or not. Regarding file I/O issues on SS, two issues I know about are: 1) If you use streaming aggregation, it unnecessarily creates a temporary file for both read and write on the state store, while the file is only needed for writing. That makes the number of file creations to be 2x. The patch is proposed under SPARK-30294 [1]. 2) Spark leverages HDFS API which is configured to create crc file per file by default. (So you'll have 2x files than expected.) There's a bug in HDFS API (HADOOP-16255 [2]) which missed to handle crc files during rename (in short of how checkpoint works in SS, temp file is atomically renamed to be the final file), and as a workaround (SPARK-28025 [3]) Spark tries to delete the crc file which two additional operations (exist -> delete) may occur per crc file. Hope this helps. Thanks, Jungtaek Lim (HeartSaVioR) 1. https://issues.apache.org/jira/browse/SPARK-30294 2. https://issues.apache.org/jira/browse/HADOOP-16255 3. https://issues.apache.org/jira/browse/SPARK-28025 On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <obog...@gmail.com> wrote: > I am trying to run a Spark structured streaming program simulating basic > scenario of ingesting events and calculating aggregates on a window with > watermark, and I am observing an inordinate amount of disk IO Spark > performs. > > The basic structure of the program is like this: > > sparkSession = SparkSession.builder() > .appName(....) > .master("local[*]") > .config("spark.executor.memory", "8g") > .config("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > .config("spark.kryoserializer.buffer", "8m") > .config("spark.local.dir", ...local > directory...) > .getOrCreate(); > > sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the > app ...); > > dataset = sparkSession.readStream() > .option("checkpointLocation", ... checkpoint dir for > source ...) > .format(MockStreamingSource.class.getName()) > .load(); > > Dataset<Row> ds = dataset > .withWatermark("timestamp", "10 minutes") > .groupBy( > functions.window(functions.col("timestamp"), > "2 minutes"), > functions.col("source")) > .agg( > functions.avg("D0").as("AVG_D0"), > functions.avg("I0").as("AVG_I0")); > > DataStreamWriter<Row> dsw = ds.writeStream() > // .trigger(Trigger.ProcessingTime("1 > minute")) > .option("checkpointLocation", .. checkpoint > dir for writer ... ); > > dsw.outputMode(OutputMode.Append()) > .format("console") > .option("truncate", "false") > .option("numRows", Integer.MAX_VALUE) > .start() > .awaitTermination(); > > > MockStreamingSource is just that -- a source intended to provide a > simulated input. It generates microbatches of mock events and sends them to > the app. In the testing scenario, the source simulates 20,000 devices each > sending an event every 15 seconds for 11.5 minutes of logical time (just > under 12 minutes of window size + watermark), for a total number of 920,000 > events. > > I initially started with microbatch sized to 500 events, and processing > performance was totally dismal because of disk IO. I then increased > microbatch size and performance got better, but still very poor. Microbatch > size now is 13,334 events per batch, this corresponds to ingestion interval > of 10 seconds. Smaller batches resulted in worse performance. > > But even with microbatch sized 13,334 event performance is poor because of > excessive disk IO generated by Spark. > Just ingesting data generated intra-app takes the program physical time > equal to 40% of window size + watermark. > > Using strace, I measured that checkpoint directory for the stream writer > receives the following number of Linux system calls: > > create/open file = 60,500 calls > mkdir = 57,000 > readlink = 59,000 > unlink = 41,900 > rename = 14,700 > execve readlink=353,000 (incl. repetitive searches of readlink executable > in 6 different locations) > execve chmod=340,620 (incl. repetitive searches of chmod executable in 6 > different locations) > > In addition, Spark local directory received: > > create/open file = 55,000 calls > unlink = 13,800 > stat = 42,000 > > That's for mere 920,000 of small events (each event Row is 600 bytes when > in Java heap). > > I also tried trigger(...) to see whether it can improve anything, but it > just made things worse. > > Spark version 2.4.6. > > Is this an expected amount of disk IO for Spark, or am I doing something > wrong and there is a way to avoid Spark generating such an amount of disk > IO? >