First of all, you'd want to divide these numbers by the number of
micro-batches, as file creations in checkpoint directory would occur
similarly per micro-batch.
Second, you'd want to dive inside the checkpoint directory and have
separate numbers per top-subdirectory.

After that we can see whether the value would make sense or not.

Regarding file I/O issues on SS, two issues I know about are:
1) If you use streaming aggregation, it unnecessarily creates a temporary
file for both read and write on the state store, while the file is only
needed for writing. That makes the number of file creations to be 2x. The
patch is proposed under SPARK-30294 [1].

2) Spark leverages HDFS API which is configured to create crc file per file
by default. (So you'll have 2x files than expected.) There's a bug in HDFS
API (HADOOP-16255 [2]) which missed to handle crc files during rename (in
short of how checkpoint works in SS, temp file is atomically renamed to be
the final file), and as a workaround (SPARK-28025 [3]) Spark tries to
delete the crc file which two additional operations (exist -> delete) may
occur per crc file.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

1. https://issues.apache.org/jira/browse/SPARK-30294
2. https://issues.apache.org/jira/browse/HADOOP-16255
3. https://issues.apache.org/jira/browse/SPARK-28025


On Sun, Oct 4, 2020 at 10:08 PM Sergey Oboguev <obog...@gmail.com> wrote:

> I am trying to run a Spark structured streaming program simulating basic
> scenario of ingesting events and calculating aggregates on a window with
> watermark, and I am observing an inordinate amount of disk IO Spark
> performs.
>
> The basic structure of the program is like this:
>
> sparkSession = SparkSession.builder()
>                            .appName(....)
>                            .master("local[*]")
>                            .config("spark.executor.memory", "8g")
>                            .config("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>                            .config("spark.kryoserializer.buffer", "8m")
>                            .config("spark.local.dir", ...local
> directory...)
>                            .getOrCreate();
>
> sparkSession.sparkContext().setCheckpointDir(... checkpoint dir for the
> app ...);
>
> dataset = sparkSession.readStream()
>                       .option("checkpointLocation", ... checkpoint dir for
> source ...)
>                       .format(MockStreamingSource.class.getName())
>                       .load();
>
> Dataset<Row> ds = dataset
>                       .withWatermark("timestamp", "10 minutes")
>                       .groupBy(
>                               functions.window(functions.col("timestamp"),
> "2 minutes"),
>                               functions.col("source"))
>                       .agg(
>                               functions.avg("D0").as("AVG_D0"),
>                               functions.avg("I0").as("AVG_I0"));
>
> DataStreamWriter<Row> dsw = ds.writeStream()
>                               // .trigger(Trigger.ProcessingTime("1
> minute"))
>                               .option("checkpointLocation", .. checkpoint
> dir for writer ... );
>
> dsw.outputMode(OutputMode.Append())
>    .format("console")
>    .option("truncate", "false")
>    .option("numRows", Integer.MAX_VALUE)
>    .start()
>    .awaitTermination();
>
>
> MockStreamingSource is just that -- a source intended to provide a
> simulated input. It generates microbatches of mock events and sends them to
> the app. In the testing scenario, the source simulates 20,000 devices each
> sending an event every 15 seconds for 11.5 minutes of logical time (just
> under 12 minutes of window size + watermark), for a total number of 920,000
> events.
>
> I initially started with microbatch sized to 500 events, and processing
> performance was totally dismal because of disk IO. I then increased
> microbatch size and performance got better, but still very poor. Microbatch
> size now is 13,334 events per batch, this corresponds to ingestion interval
> of 10 seconds. Smaller batches resulted in worse performance.
>
> But even with microbatch sized 13,334 event performance is poor because of
> excessive disk IO generated by Spark.
> Just ingesting data generated intra-app takes the program physical time
> equal to 40% of window size + watermark.
>
> Using strace, I measured that checkpoint directory for the stream writer
> receives the following number of Linux system calls:
>
> create/open file = 60,500 calls
> mkdir = 57,000
> readlink = 59,000
> unlink = 41,900
> rename = 14,700
> execve readlink=353,000 (incl. repetitive searches of readlink executable
> in 6 different locations)
> execve chmod=340,620 (incl. repetitive searches of chmod executable in 6
> different locations)
>
> In addition, Spark local directory received:
>
> create/open file = 55,000 calls
> unlink = 13,800
> stat = 42,000
>
> That's for mere 920,000 of small events (each event Row is 600 bytes when
> in Java heap).
>
> I also tried trigger(...) to see whether it can improve anything, but it
> just made things worse.
>
> Spark version 2.4.6.
>
> Is this an expected amount of disk IO for Spark, or am I doing something
> wrong and there is a way to avoid Spark generating such an amount of disk
> IO?
>

Reply via email to