Hi Chesnay,

Thanks for responding.


I managed to resolve the problem last Friday; I had a single datasource for 
each file, instead of one big datasource for all the files. The reading of the 
one or two HDFS blocks within each datasource was then distributed to a small 
percentage of slots (let's say ~10%). Some Beam runner-specific knowledge for 
Flink I did not yet have.


> the function after the groupBy() should still make full use of the 
> parallelism of the cluster
> Do note that data skew can affect how much data is distributed to each node


I do not remember seeing this behaviour, instead I remember data was 
redistributed only among slots that did the reading, but I cannot verify this 
at this point. Also, I do not know exactly how Beam operators map to Flink's. 
Key distribution is in the millions and quite uniform.


Reinier

________________________________
From: Chesnay Schepler <ches...@apache.org>
Sent: 13 March 2018 12:40:02
To: user@flink.apache.org
Subject: Re: HDFS data locality and distribution

Hello,

You said that "data is distributed very badly across slots"; do you mean that 
only a small number of subtasks is reading from HDFS, or that the keyed data is 
only processed by a few subtasks?

Flink does prioritize date locality over date distribution when reading the 
files, but the function after the groupBy() should still make full use of the 
parallelism of the cluster. Do note that data skew can affect how much data is 
distributed to each node, i.e. if 80% of your data has the same key (or rather 
hash), they will all end up on the same node.

On 12.03.2018 13:49, Reinier Kip wrote:

Relevant versions: Beam 2.1, Flink 1.3.

________________________________
From: Reinier Kip <r...@bol.com><mailto:r...@bol.com>
Sent: 12 March 2018 13:45:47
To: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: HDFS data locality and distribution


Hey all,


I'm trying to batch-process 30-ish files from HDFS, but I see that data is 
distributed very badly across slots. 4 out of 32 slots get 4/5ths of the data, 
another 3 slots get about 1/5th and a last slot just a few records. This 
probably triggers disk spillover on these slots and slows down the job 
immensely. The data has many many unique keys and processing could be done in a 
highly parallel manner. From what I understand, HDFS data locality governs 
which splits are assigned to which subtask.


  *   I'm running a Beam on Flink on YARN pipeline.
  *   I'm reading 30-ish files, whose records are later grouped by their 
millions of unique keys.
  *   For now, I have 8 task managers by 4 slots. Beam sets all subtasks to 
have 32 parallelism.
  *   Data seems to be localised to 9 out of the 32 slots, 3 out of the 8 task 
managers.


Does the statement of input split assignment ring true? Is the fact that data 
isn't redistributed an effort from Flink to have high data locality, even if 
this means disk spillover for a few slots/tms and idleness for others? Is there 
any use for parallelism if work isn't distributed anyway?


Thanks for your time, Reinier

Reply via email to