Hi all,

I'm having some trouble grasping what the meaning of/difference between the
following concepts is:

- Split
- Group
- Partition

Let me elaborate a bit on the problem I'm trying to solve here. In my tests
I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in standalone
mode. Each node has 64G of memory and 32 cores. I'm starting the JobManager
on one node, and a TaskManager on each node. I'm assigning 16 slots to each
TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 Slots).

The data I want to process resides in a local folder on each worker with
the same path (say /tmp/input). There can be arbitrarily many input files
in each worker's folder. I have written a custom input format that
round-robin assigns the files to each of the 16 local input splits (
https://github.com/robert-schmidtke/hdfs-statistics-adapter/blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/io/SfsInputFormat.java)
to obtain a total of 80 input splits that need processing. Each split reads
zero or more files, parsing the contents into records that are emitted
correctly. This works as expected.

Now we're getting to the questions. How do these 80 input splits relate to
groups and partitions? My understanding of a partition is a subset of my
DataSet<X> that is local to each node. I.e. if I were to repartition the
data according to some scheme, a shuffling over workers would occur. After
reading all the data, I have 80 partitions, correct?

What is less clear to me is the concept of a group, i.e. the result of a
groupBy operation. The input files I have are produced on each worker by
some other process. I first want to do pre-aggregation (I hope that's the
term) on each node before sending data over the network. The records I'm
processing contain a 'hostname' attribute, which is set to the worker's
hostname that processes the data, because the DataSources are local. That
means the records produced by the worker on host1 always contain the
attribute hostname=host1. Similar for the other 4 workers.

Now what happens if I do a groupBy("hostname")? How do the workers realize
that no network transfer is necessary? Is a group a logical abstraction, or
a physical one (in my understanding a partition is physical because it's
local to exactly one worker).

What I'd like to do next is a reduceGroup to merge multiple records into
one (some custom, yet straightforward, aggregation) and emit another record
for every couple of input records. Am I correct in assuming that the
Iterable<X> values passed to the reduce function all have the same hostname
value? That is, will the operation have a parallelism of 80, where 5x16
operations will have the same hostname value? Because I have 16 splits per
host, the 16 reduces on host1 should all receive values with
hostname=host1, correct? And after the operation has finished, will the
reduced groups (now actual DataSets again) still be local to the workers?

This is quite a lot to work on I have to admit. I'm happy for any hints,
advice and feedback on this. If there's need for clarification I'd be happy
to provide more information.

Thanks a lot in advance!

Robert

-- 
My GPG Key ID: 336E2680

Reply via email to