[
https://issues.apache.org/jira/browse/IGNITE-4037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566271#comment-15566271
]
Ivan Veselovsky commented on IGNITE-4037:
-----------------------------------------
Key classes that implement spilling functionality in Hadoop on Map and Reduce
sides (version 2.7.2):
{code}
org.apache.hadoop.mapred.MapTask
org.apache.hadoop.mapreduce.Partitioner
org.apache.hadoop.mapred.MapTask#runNewMapper
org.apache.hadoop.mapred.MapTask.NewOutputCollector
org.apache.hadoop.mapreduce.MRJobConfig#MAP_OUTPUT_COLLECTOR_CLASS_ATTR
org.apache.hadoop.mapred.MapTask.MapOutputBuffer -- default collector
implementation.
org.apache.hadoop.util.IndexedSorter -- sorting interface.
org.apache.hadoop.mapred.MapTask.MapOutputBuffer#spillThread,
org.apache.hadoop.mapred.MapTask.MapOutputBuffer.SpillThread
org.apache.hadoop.mapred.MapTask.MapOutputBuffer#sortAndSpill
org.apache.hadoop.mapred.SpillRecord
org.apache.hadoop.mapreduce.task.reduce.Shuffle
org.apache.hadoop.mapreduce.task.reduce.InMemoryMapOutput
org.apache.hadoop.mapreduce.task.reduce.Fetcher
pull: org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyFromHost
org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler,
org.apache.hadoop.mapreduce.task.reduce.ShuffleSchedulerImpl
org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyMapOutput
org.apache.hadoop.mapreduce.task.reduce.Fetcher#copyFromHost
{code}
In Ignite we have
{code}org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage{code}
to transfer data between nodes.
The data from these messages are sent from and stored into
{code}org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimapBase{code}-based
structures.
Base classes involve:
{code}
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopSkipList
org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopConcurrentHashMultimap
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleMessage
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#msgs
send:
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#collectUpdatesAndSend
receive:
org.apache.ignite.internal.processors.hadoop.shuffle.HadoopShuffleJob#onShuffleMessage
{code}
As a simplest solution (way of minimal changes) we can try to implement disk
spilling transparently, preserving the interface of
{code}org.apache.ignite.internal.processors.hadoop.shuffle.collections.HadoopMultimapBase{code}
, but spill/read in the background, behind the scenes.
The configuration should be similar to that Hadoop has: (1) max memory buffer
size , and (2) percent when the buffer spilled to disk, typically 80% by
default.
This should be similarly implemented on both Maps and Reduces sides.
> High memory consumption when executing TeraSort Hadoop example
> --------------------------------------------------------------
>
> Key: IGNITE-4037
> URL: https://issues.apache.org/jira/browse/IGNITE-4037
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 1.6
> Reporter: Ivan Veselovsky
> Assignee: Ivan Veselovsky
> Fix For: 1.7
>
>
> When executing TeraSort Hadoop example, we observe high memory consumption
> that frequently leads to cluster malfunction.
> The problem can be reproduced in unit test, even with 1 node, and with not
> huge input data set as 100Mb.
> Dump analysis shows that memory is taken in various queues:
> org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopExecutorService#queue
>
> and
> task queue of
> org.apache.ignite.internal.processors.hadoop.jobtracker.HadoopJobTracker#evtProcSvc
> .
> Since objects stored in these queues hold byte arrays of significant size,
> memory if consumed very fast.
> It looks like real cause of the problem is that some tasks are blocked.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)