Hi, I'm currently examining the I/O patterns of Flink, and I'd like to know when/how Flink goes to disk. Let me give an introduction of what I have done so far.
I am running TeraGen (from the Hadoop examples package) + TeraSort ( https://github.com/robert-schmidtke/terasort) on a 16 node cluster, each node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of disk. I'm using YARN and HDFS. The underlying file system is XFS. Now before running TeraGen and TeraSort, I reset the XFS counters to zero, and after TeraGen + TeraSort are finished, I dump the XFS counters again. Accumulated over the entire cluster I get 3 TiB of writes and 3.2 TiB of reads. What I'd have expected would be 2 TiB of writes (1 for TeraGen, 1 for TeraSort) and 1 TiB of reads (during TeraSort). Unsatisfied by the coarseness of these numbers I developed an HDFS wrapper that logs file system statistics for each call to hdfs://..., such as start time/end time, no. of bytes read/written etc. I can plot these numbers and see what I expect: during TeraGen I have 1 TiB of writes to hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of writes to hdfs://... So far, so good. Now this still did not explain the disk I/O, so I added bytecode instrumentation to a range of Java classes, like FileIn/OutputStream, RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for memory mapped files etc., and have the same statistics: start/end of a read from/write to disk, no. of bytes involved and such. I can plot these numbers too and see that the HDFS JVMs write 1 TiB of data to disk during TeraGen (expected) and read and write 1 TiB from and to disk during TeraSort (expected). Sorry for the enormous introduction, but now there's finally the interesting part: Flink's JVMs read from and write to disk 1 TiB of data each during TeraSort. I'm suspecting there is some sort of spilling involved, potentially because I have not done the setup properly. But that is not the crucial point: my statistics give a total of 3 TiB of writes to disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS counters from above. However, my statistics only give 2 TiB of reads from disk (1 TiB for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from disk somewhere. I have done the same with Hadoop TeraSort, and there I'm not missing any data, meaning my statistics agree with XFS for TeraSort on Hadoop, which is why I suspect there are some cases where Flink goes to disk without me noticing it. Therefore here finally the question: in which cases does Flink go to disk, and how does it do so (meaning precisely which Java classes are involved, so I can check my bytecode instrumentation)? This would also include any kind of resource distribution via HDFS/YARN I guess (like JAR files and I don't know what). Seeing that I'm missing an amount of data equal to the size of my input set I'd suspect there must be some sort of shuffling/spilling at play here, but I'm not sure. Maybe there is also some sort of remote I/O involved via sockets or so that I'm missing. Any hints as to where Flink might incur disk I/O are greatly appreciated! I'm also happy with doing the digging myself, once pointed to the proper packages in the Apache Flink source tree (I have done my fair share of inspection already, but could not be sure whether or not I have missed something). Thanks a lot in advance! Robert -- My GPG Key ID: 336E2680