Hello Fabian,

I have created a directory on my host machine user directory (
/home/myuser/mydir ) and I am mapping it as a volume with Docker for the
TaskManager and JobManager containers.
Each container will thus have the following directory /home/flink/htmp

host ---> container
/home/myuser/mydir ---> /home/flink/htmp

I had previously done this successfully with the a host directory which
holds several SNAP data sets.
In the Flink configuration file, I specified /home/flink/htmp to be used as
the tmp dir for the TaskManager.
This seems to be working, as I was able to start the cluster and invoke
Flink for that Friendster dataset.

However, during execution, there were 2 intermediate files which kept
growing until they reached about 30 GB.
At that point, the Flink TaskManager threw the exception again:

java.lang.RuntimeException: Error obtaining the sorted input: Thread
'SortMerger spilling thread' terminated due to an exception: No space left
on device

Here is an ls excerpt of the directory on the host (to which the
TaskManager container was also writing successfully) shortly before the
exception:

*31G *9d177a1971322263f1597c3378885ccf.channel
*31G* a693811249bc5f785a79d1b1b537fe93.channel

Now I believe the host system is capable of storing hundred GBs more, so I
am confused as to what the problem might be.

Best regards,

Miguel E. Coimbra
Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
Skype: miguel.e.coimbra

​
>
> Hi Miguel,
>
> the exception does indeed indicate that the process ran out of available
> disk space.
> The quoted paragraph of the blog post describes the situation when you
> receive the IOE.
>
> By default the systems default tmp dir is used. I don't know which folder
> that would be in a Docker setup.
> You can configure the temp dir using the taskmanager.tmp.dirs config key.
> Please see the configuration documentation for details [1].
>
> Hope this helps,
> Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/
> setup/config.html#jobmanager-amp-taskmanager
>
> 2016-12-02 0:18 GMT+01:00 Miguel Coimbra <miguel.e.coim...@gmail.com>:
> ​
>
>> Hello,
>>
>> I have a problem for which I hope someone will be able to give a hint.
>> I am running the Flink *standalone* cluster with 2 Docker containers (1
>> TaskManager and 1 JobManager) using 1 TaskManager with 30 GB of RAM.
>>
>> The dataset is a large one: SNAP Friendster, which has around 1800 M
>> edges.
>> https://snap.stanford.edu/data/com-Friendster.html
>>
>> I am trying to run the Gelly built-in label propagation algorithm on top
>> of it.
>> As this is a very big dataset, I believe I am exceeding the available RAM
>> and that the system is using secondary storage, which then fails:
>>
>>
>> Connected to JobManager at Actor[akka.tcp://flink@172.19.
>> 0.2:6123/user/jobmanager#894624508]
>> 12/01/2016 17:58:24    Job execution switched to status RUNNING.
>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> SCHEDULED
>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> DEPLOYING
>> 12/01/2016 17:58:24    DataSource (at main(App.java:33) (
>> org.apache.flink.api.java.io.TupleCsvInputFormat))(1/1) switched to
>> RUNNING
>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to SCHEDULED
>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to DEPLOYING
>> 12/01/2016 17:58:24    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to RUNNING
>> 12/01/2016 17:59:51    Map (Map at fromTuple2DataSet(Graph.java:343))(1/1)
>> switched to FAILED
>> *java.lang.RuntimeException: Error obtaining the sorted input: Thread
>> 'SortMerger spilling thread' terminated due to an exception: No space left
>> on device*
>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> .getIterator(UnilateralSortMerger.java:619)
>>     at org.apache.flink.runtime.operators.BatchTask.getInput(BatchT
>> ask.java:1098)
>>     at org.apache.flink.runtime.operators.MapDriver.run(MapDriver.j
>> ava:86)
>>     at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.j
>> ava:486)
>>     at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTas
>> k.java:351)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
>>     at java.lang.Thread.run(Thread.java:745)
>> *Caused by: java.io.IOException: Thread 'SortMerger spilling thread'
>> terminated due to an exception: No space left on device*
>>     at org.apache.flink.runtime.operators.sort.UnilateralSortMerger
>> $ThreadBase.run(UnilateralSortMerger.java:800)
>> Caused by: java.io.IOException: No space left on device
>>     at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
>>     at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60)
>>     at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
>>     at sun.nio.ch.IOUtil.write(IOUtil.java:65)
>>     at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211)
>>     at org.apache.flink.runtime.io.disk.iomanager.SegmentWriteReque
>> st.write(AsynchronousFileIOChannel.java:344)
>>     at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync$Wr
>> iterThread.run(IOManagerAsync.java:502)
>>
>>
>> I do not have secondary storage limitations on the host system, so I
>> believe the system would be able to handle whatever is spilled to the
>> disk...
>> Perhaps this is a Docker limitation regarding the usage of the host's
>> secondary storage?
>>
>> Or is there perhaps some configuration or setting for the TaskManager
>> which I am missing?
>> Running the label propagation of Gelly on this dataset and cluster
>> configuration, what would be the expected behavior if the system consumes
>> all the memory?
>>
>>
>> I believe the SortMerger thread is associated to the following mechanism
>> described in this blog post:
>>
>> https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>> -Flinks-Engine-Room.html
>> *The Sort-Merge-Join works by first sorting both input data sets on their
>> join key attributes (Sort Phase) and merging the sorted data sets as a
>> second step (Merge Phase). The sort is done in-memory if the local
>> partition of a data set is small enough. Otherwise, an external merge-sort
>> is done by collecting data until the working memory is filled, sorting it,
>> writing the sorted data to the local filesystem, and starting over by
>> filling the working memory again with more incoming data. After all input
>> data has been received, sorted, and written as sorted runs to the local
>> file system, a fully sorted stream can be obtained. This is done by reading
>> the partially sorted runs from the local filesystem and sort-merging the
>> records on the fly. Once the sorted streams of both inputs are available,
>> both streams are sequentially read and merge-joined in a zig-zag fashion by
>> comparing the sorted join key attributes, building join element pairs for
>> matching keys, and advancing the sorted stream with the lower join key.*
>>
>> I am still investigating the possibility that Docker is at fault
>> regarding secondary storage limitations, but how would I go about
>> estimating the amount of disk space needed for this spilling on this
>> dataset?
>>
>> Thanks for your time,
>>
>> My best regards,
>>
>> Miguel E. Coimbra
>> Email: miguel.e.coim...@gmail.com <miguel.e.coim...@ist.utl.pt>
>> Skype: miguel.e.coimbra
>
>

Reply via email to