Hi Fabian,

thanks for the quick and comprehensive reply. I'll have a look at the
ExecutionPlan using your suggestion to check what actually gets computed,
and I'll use the properties as well. If I stumble across something else
I'll let you know.

Many thanks again!
Robert

On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Robert,
>
> let me first describe what splits, groups, and partitions are.
>
> * Partition: This is basically all data that goes through the same task
> instance. If you have an operator with a parallelism of 80, you have 80
> partitions. When you call sortPartition() you'll have 80 sorted streams, if
> you call mapPartition you iterate over all records in one partition.
> * Split: Splits are a concept of InputFormats. An InputFormat can process
> several splits. All splits that are processed by the same data source task
> make up the partition of that task. So a split is a subset of a partition.
> In your case where each task reads exactly one split, the split is
> equivalent to the partition.
> * Group: A group is based on the groupBy attribute and hence data-driven
> and does not depend on the parallelism. A groupReduce requires a
> partitioning such that all records with the same grouping attribute are
> sent to the same operator, i.e., all are part of the same partition.
> Depending on the number of distinct grouping keys (and the hash-function) a
> partition can have zero, one, or more groups.
>
> Now coming to your use case. You have 80 sources running on 5 machines.
> All source on the same machine produce records with the same grouping key
> (hostname). You can actually give a hint to Flink, that the data returned
> by a split is partitioned, grouped, or sorted in a specific way. This works
> as follows:
>
> // String is hostname, Integer is parallel id of the source task
> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
> ((DataSource)text).getSplitDataProperties();
> splitProps.splitsGroupedBy(0,1)
> splitProps.splitsPartitionedBy(0,1)
>
> With this info, Flink knows that the data returned by our source is
> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
> run a local groupReduce operation on each of the 80 tasks (hostname and
> parallel index result in 80 keys) and locally reduce the data.
> Next step would be another .groupBy(0).groupReduce() which gives 16 groups
> which are distributed across your tasks.
>
> However, you have to be careful with the SplitDataProperties. If you get
> them wrong, the optimizer makes false assumption and the resulting plan
> might not compute what you are looking for.
> I'd recommend to read the JavaDocs and play a bit with this feature to see
> how it behaves. ExecutionEnvironment.getExecutionPlan() can help to
> figure out what is happening.
>
> Best,
> Fabian
>
>
> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>
>> Hi all,
>>
>> I'm having some trouble grasping what the meaning of/difference between
>> the following concepts is:
>>
>> - Split
>> - Group
>> - Partition
>>
>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the
>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>> Slots).
>>
>> The data I want to process resides in a local folder on each worker with
>> the same path (say /tmp/input). There can be arbitrarily many input files
>> in each worker's folder. I have written a custom input format that
>> round-robin assigns the files to each of the 16 local input splits (
>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>> io/SfsInputFormat.java) to obtain a total of 80 input splits that need
>> processing. Each split reads zero or more files, parsing the contents into
>> records that are emitted correctly. This works as expected.
>>
>> Now we're getting to the questions. How do these 80 input splits relate
>> to groups and partitions? My understanding of a partition is a subset of my
>> DataSet<X> that is local to each node. I.e. if I were to repartition the
>> data according to some scheme, a shuffling over workers would occur. After
>> reading all the data, I have 80 partitions, correct?
>>
>> What is less clear to me is the concept of a group, i.e. the result of a
>> groupBy operation. The input files I have are produced on each worker by
>> some other process. I first want to do pre-aggregation (I hope that's the
>> term) on each node before sending data over the network. The records I'm
>> processing contain a 'hostname' attribute, which is set to the worker's
>> hostname that processes the data, because the DataSources are local. That
>> means the records produced by the worker on host1 always contain the
>> attribute hostname=host1. Similar for the other 4 workers.
>>
>> Now what happens if I do a groupBy("hostname")? How do the workers
>> realize that no network transfer is necessary? Is a group a logical
>> abstraction, or a physical one (in my understanding a partition is physical
>> because it's local to exactly one worker).
>>
>> What I'd like to do next is a reduceGroup to merge multiple records into
>> one (some custom, yet straightforward, aggregation) and emit another record
>> for every couple of input records. Am I correct in assuming that the
>> Iterable<X> values passed to the reduce function all have the same hostname
>> value? That is, will the operation have a parallelism of 80, where 5x16
>> operations will have the same hostname value? Because I have 16 splits per
>> host, the 16 reduces on host1 should all receive values with
>> hostname=host1, correct? And after the operation has finished, will the
>> reduced groups (now actual DataSets again) still be local to the workers?
>>
>> This is quite a lot to work on I have to admit. I'm happy for any hints,
>> advice and feedback on this. If there's need for clarification I'd be happy
>> to provide more information.
>>
>> Thanks a lot in advance!
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Reply via email to