Hi, the Reducer-side sorting is done with an external merge-sort. The sorter collects records in an in-memory buffer until it is completely filled, sorts the buffer using quicksort, and spills the sorted result to disk (if available a combiner is applied before spilling to reduce IO). After all data has been partially sorted and spilled to disk, the sorted sub-sequences are concurrently read from disk and merged. The resulting stream is fully sorted and is directly fed into the Reducer. Hence, Flink is able to process groups which are larger than the available memory because data is read from disk. If all data fits into memory, no data is spilled to disk. In case of too many sorted sub-sequences, the merging is hierarchically done (data is written more than once to disk).
At the record shipping layer, Flink does not care about sorted record sequences and might shuffle records from all senders. Hence sorted subsequences cannot be exploited. Best, Fabian 2016-08-03 15:43 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@dfki.de>: > Hi, > > I have another question. The reducer sorts its inputs before it starts > with computation. Which sorting algorithm it is using? In Flink I found > QuickSort, HeapSort and etc. > > Does the sorting algorithm benefit from pre-sorted partitions. For > example, a MergeSort algorithm can sort the partitions of multiple maps > together to create a single sorted partition for the reducer. If the map > partitions are already sorted, then the MergeSort algorithm can run faster. > > > Are there any benefits if the map partitions are sorted? > > > Thank you > > > BR, > > Hilmi > > > Am 02.08.2016 um 10:01 schrieb Hilmi Yildirim: > >> Hi Fabian, >> >> thank you very much! This answers my question. >> >> >> BR, >> >> Hilmi >> >> >> Am 01.08.2016 um 22:29 schrieb Fabian Hueske: >> >>> Hi Hilmi, >>> >>> the results of the combiner are usually not completely sorted and if they >>> are this property is not leveraged. >>> This is due to the following reasons: >>> 1) a sort-combiner only sorts as much data as fits into memory. If there >>> is >>> more data, the result consists of multiple sorted sequences. >>> 2) since recently, Flink features a hash-based combiner which is usually >>> more efficient and does not produce sorted output. >>> 3) Flink's pipelined shipping strategy would require that the >>> receiver merges the result records from all senders on the fly while >>> receiving data via the network. In case of a straggling sender task all >>> other senders would be blocked due to backpressure. In addition, this >>> would >>> only work if the combiner does a full sort and not several in-memory >>> sorts. >>> >>> So, a Reducer will always do a full sort of all received data before >>> applying the Reduce function (if available, a combiner is applied before >>> data is written to disk in case of an external sort). >>> >>> Hope this helps, >>> Fabian >>> >>> 2016-08-01 18:25 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@dfki.de>: >>> >>> Hi, >>>> >>>> I have a question regarding when data points are sorted when applying a >>>> simple Map Reduce Job. >>>> >>>> I have the following code: >>>> >>>> data = readFromSource() >>>> >>>> data.map(....).groupBy(0).reduce(...) >>>> >>>> This code will be translated into the following execution plan: >>>> >>>> map -> combiner -> hash partitioning and sorting on 0 -> reduce. >>>> >>>> >>>> If I am right then the combiner firstly sorts the data, then it applies >>>> the combine function, and then it partitions the result. >>>> >>>> Now the partitions are consumed by the reducers. For each >>>> mapper/combiner >>>> machine, the reducer has an input gateway. For example, the mappers and >>>> combiners run on 10 machines, then each reducer has 10 input gateways. >>>> Now, >>>> the reducer consumes the data via a MutableObjectIterator. This iterator >>>> firstly consumes data from one input gateway, then from the other and so >>>> on. Is the data of a single input gateway already sorted? Because the >>>> combiner function has sorted the data already. Is the order of the data >>>> points maintained after they are sent through the network? >>>> >>>> In my code, the MutableObjectIterator instances are subclasses of >>>> NormalizedKeySorter. Does this mean that the data from an input gateway >>>> is >>>> firstly sorted before it is handover to the reduce function? Is this >>>> because the order of the data points is not mainted after sending >>>> through >>>> the network? >>>> >>>> >>>> It would be nice if someone can answer my question. If my assumptions >>>> are >>>> wrong, please correct me :) >>>> >>>> >>>> BR, >>>> >>>> Hilmi >>>> >>>> >>>> >>>> >>>> -- >>>> ================================================================== >>>> Hilmi Yildirim, M.Sc. >>>> Researcher >>>> >>>> DFKI GmbH >>>> Intelligente Analytik für Massendaten >>>> DFKI Projektbüro Berlin >>>> Alt-Moabit 91c >>>> D-10559 Berlin >>>> Phone: +49 30 23895 1814 >>>> >>>> E-Mail: hilmi.yildi...@dfki.de >>>> >>>> ------------------------------------------------------------- >>>> Deutsches Forschungszentrum fuer Kuenstliche Intelligenz GmbH >>>> Firmensitz: Trippstadter Strasse 122, D-67663 Kaiserslautern >>>> >>>> Geschaeftsfuehrung: >>>> Prof. Dr. Dr. h.c. mult. Wolfgang Wahlster (Vorsitzender) >>>> Dr. Walter Olthoff >>>> >>>> Vorsitzender des Aufsichtsrats: >>>> Prof. Dr. h.c. Hans A. Aukes >>>> >>>> Amtsgericht Kaiserslautern, HRB 2313 >>>> ------------------------------------------------------------- >>>> >>>> >>>> > >