Hi Tim,

depending on how you create the DataSource<String> fileList, Flink will
schedule the downstream operators differently. If you used the
ExecutionEnvironment.fromCollection method, then it will create a DataSource
with a CollectionInputFormat. This kind of DataSource will only be executed
with a degree of parallelism of 1. The source will send it’s collection
elements in a round robin fashion to the downstream operators which are
executed with a higher parallelism. So when Flink schedules the downstream
operators, it will try to place them close to their inputs. Since all flat
map operators have the single data source task as an input, they will be
deployed on the same machine if possible.

In contrast, if you had a parallel data source which would consist of
multiple source task, then these tasks would be independent and spread out
across your cluster. In this case, every flat map task would have a single
distinct source task as input. When the flat map tasks are deployed they
would be deployed on the machine where their corresponding source is
running. Since the source tasks are spread out across the cluster, the flat
map tasks would be spread out as well.

What you could do to mitigate your problem is to start the cluster with as
many slots as your maximum degree of parallelism is. That way, you’ll
utilize all cluster resources.

I hope this clarifies a bit why you observe that tasks tend to cluster on a
single machine.

Cheers,
Till
​

On Tue, Feb 23, 2016 at 1:49 PM, Tim Conrad <con...@math.fu-berlin.de>
wrote:

>
>
> Dear FLINK community.
>
> I was wondering what would be the recommended (best?) way to achieve some
> kind of file conversion. That runs in parallel on all available Flink
> Nodes, since it it "embarrassingly parallel" (no dependency between files).
>
> Say, I have a HDFS folder that contains multiple structured text-files
> containing (x,y) pairs (think of CVS).
>
> For each of these files I want to do (individual for each file) the
> following:
>
> * Read file from HDFS
> * Extract dataset(s) from file (e.g. list of (x,y) pairs)
> * Apply some filter (e.g. smoothing)
> * Do some pattern recognition on smoothed data
> * Write results back to HDFS (different format)
>
> Would the following be a good idea?
>
> DataSource<String> fileList = ... // contains list of file names in HDFS
>
> // For each "filename" in list do...
> DataSet<FeatureList> featureList = fileList
>                 .flatMap(new ReadDataSetFromFile()) // flatMap because
> there might multiple DataSets in a file
>                 .map(new Smoothing())
>                 .map(new FindPatterns());
>
> featureList.writeAsFormattedText( ... )
>
> I have the feeling that Flink does not distribute the independent tasks on
> the available nodes but executes everything on only one node.
>
>
> Cheers
> Tim
>
>
>
>

Reply via email to