Hi Ufuk, thanks for coming back to me on this.

The records are 100 bytes in size, the benchmark being TeraSort, so that
should not be an issue. I have played around with the input size, and here
are my observations:

128 GiB input: 0 Spilling in Flink.
256 GiB input: 88 GiB Spilling in Flink (so 88 GiB of reads, 88 GiB of
writes), and my instrumentation covers all of it.
384 GiB input: 391 GiB Spilling in Flink, and I cover all of it.
512 GiB input: 522 GiB Spilling in Flink, but I miss 140 GiB of it.
640 GiB input: 653 GiB Spilling in Flink, but I miss 281 GiB of it.
768 GiB input: 784 GiB Spilling in Flink, but I miss 490 GiB of it.
896 GiB input: 914 GiB Spilling in Flink, but I miss 662 GiB of it.
1024 GiB input: 1045 GiB Spilling in Flink, but I miss 968 GiB of it.

So regardless of how well configured my system is and spilling is even
necessary, it seems that with larger spilling amounts, the way the data is
spilled changes (and I start missing larger and larger portions of I/O
until almost 100%).
Now since I have written the instrumentation myself, I cannot guarantee
that it is flawless and I might have missed something.
I'm currently looking into how the file channels are being accessed in
parallel by multiple threads, which I cover as well and my tests verify it,
but maybe there are special access patterns here.

Robert

On Mon, Apr 24, 2017 at 2:25 PM, Ufuk Celebi <u...@apache.org> wrote:

> Hey Robert,
>
> for batch that should cover the relevant spilling code. If the records
> are >= 5 MB, the SpillingAdaptiveSpanningRecordDeserializer will spill
> incoming records as well. But that should be covered by the
> FileChannel instrumentation as well?
>
> – Ufuk
>
>
> On Tue, Apr 18, 2017 at 3:57 PM, Robert Schmidtke
> <ro.schmid...@gmail.com> wrote:
> > Hi,
> >
> > I have already looked at the UnilateralSortMerger, concluding that all
> I/O
> > eventually goes via SegmentReadRequest and SegmentWriteRequest (which in
> > turn use java.nio.channels.FileChannel) in AsynchronousFileIOChannel. Are
> > there more interaction points between Flink and the underlying file
> system
> > that I might want to consider?
> >
> > Thanks!
> > Robert
> >
> > On Fri, Apr 7, 2017 at 5:02 PM, Kurt Young <ykt...@gmail.com> wrote:
> >>
> >> Hi,
> >>
> >> You probably want check out UnilateralSortMerger.java, this is the class
> >> which is responsible for external sort for flink. Here is a short
> >> description for how it works: there are totally 3 threads working
> together,
> >> one for reading, one for sorting partial data in memory, and the last
> one is
> >> responsible for spilling. Flink will first figure out how many memory
> it can
> >> use during the in-memory sort, and manage them as MemorySegments. Once
> these
> >> memory runs out, the sorting thread will take over these memory and do
> the
> >> in-memory sorting (For more details about in-memory sorting, you can see
> >> NormalizedKeySorter). After this, the spilling thread will write this
> sorted
> >> data to disk and make these memory available again for reading. This
> will
> >> repeated until all data has been processed.
> >> Normally, the data will be read twice (one from source, and one from
> disk)
> >> and write once, but if you spilled too much files, flink will first
> merge
> >> some all the files and make sure the last merge step will not exceed
> some
> >> limit (default 128). Hope this can help you.
> >>
> >> Best,
> >> Kurt
> >>
> >> On Fri, Apr 7, 2017 at 4:20 PM, Robert Schmidtke <
> ro.schmid...@gmail.com>
> >> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I'm currently examining the I/O patterns of Flink, and I'd like to know
> >>> when/how Flink goes to disk. Let me give an introduction of what I
> have done
> >>> so far.
> >>>
> >>> I am running TeraGen (from the Hadoop examples package) + TeraSort
> >>> (https://github.com/robert-schmidtke/terasort) on a 16 node cluster,
> each
> >>> node with 64 GiB of memory, 2x32 cores, and roughly half a terabyte of
> disk.
> >>> I'm using YARN and HDFS. The underlying file system is XFS.
> >>>
> >>> Now before running TeraGen and TeraSort, I reset the XFS counters to
> >>> zero, and after TeraGen + TeraSort are finished, I dump the XFS
> counters
> >>> again. Accumulated over the entire cluster I get 3 TiB of writes and
> 3.2 TiB
> >>> of reads. What I'd have expected would be 2 TiB of writes (1 for
> TeraGen, 1
> >>> for TeraSort) and 1 TiB of reads (during TeraSort).
> >>>
> >>> Unsatisfied by the coarseness of these numbers I developed an HDFS
> >>> wrapper that logs file system statistics for each call to hdfs://...,
> such
> >>> as start time/end time, no. of bytes read/written etc. I can plot these
> >>> numbers and see what I expect: during TeraGen I have 1 TiB of writes to
> >>> hdfs://..., during TeraSort I have 1 TiB of reads from and 1 TiB of
> writes
> >>> to hdfs://... So far, so good.
> >>>
> >>> Now this still did not explain the disk I/O, so I added bytecode
> >>> instrumentation to a range of Java classes, like FileIn/OutputStream,
> >>> RandomAccessFile, FileChannel, ZipFile, multiple *Buffer classes for
> memory
> >>> mapped files etc., and have the same statistics: start/end of a read
> >>> from/write to disk, no. of bytes involved and such. I can plot these
> numbers
> >>> too and see that the HDFS JVMs write 1 TiB of data to disk during
> TeraGen
> >>> (expected) and read and write 1 TiB from and to disk during TeraSort
> >>> (expected).
> >>>
> >>> Sorry for the enormous introduction, but now there's finally the
> >>> interesting part: Flink's JVMs read from and write to disk 1 TiB of
> data
> >>> each during TeraSort. I'm suspecting there is some sort of spilling
> >>> involved, potentially because I have not done the setup properly. But
> that
> >>> is not the crucial point: my statistics give a total of 3 TiB of
> writes to
> >>> disk (2 TiB for HDFS, 1 TiB for Flink), which agrees with the XFS
> counters
> >>> from above. However, my statistics only give 2 TiB of reads from disk
> (1 TiB
> >>> for HDFS, 1 TiB for Flink), so I'm missing an entire TiB of reads from
> disk
> >>> somewhere. I have done the same with Hadoop TeraSort, and there I'm not
> >>> missing any data, meaning my statistics agree with XFS for TeraSort on
> >>> Hadoop, which is why I suspect there are some cases where Flink goes
> to disk
> >>> without me noticing it.
> >>>
> >>> Therefore here finally the question: in which cases does Flink go to
> >>> disk, and how does it do so (meaning precisely which Java classes are
> >>> involved, so I can check my bytecode instrumentation)? This would also
> >>> include any kind of resource distribution via HDFS/YARN I guess (like
> JAR
> >>> files and I don't know what). Seeing that I'm missing an amount of data
> >>> equal to the size of my input set I'd suspect there must be some sort
> of
> >>> shuffling/spilling at play here, but I'm not sure. Maybe there is also
> some
> >>> sort of remote I/O involved via sockets or so that I'm missing.
> >>>
> >>> Any hints as to where Flink might incur disk I/O are greatly
> appreciated!
> >>> I'm also happy with doing the digging myself, once pointed to the
> proper
> >>> packages in the Apache Flink source tree (I have done my fair share of
> >>> inspection already, but could not be sure whether or not I have missed
> >>> something). Thanks a lot in advance!
> >>>
> >>> Robert
> >>>
> >>> --
> >>> My GPG Key ID: 336E2680
> >>
> >>
> >
> >
> >
> > --
> > My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Reply via email to