Hi Fabian, thanks for the quick and comprehensive reply. I'll have a look at the ExecutionPlan using your suggestion to check what actually gets computed, and I'll use the properties as well. If I stumble across something else I'll let you know.
Many thanks again! Robert On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Robert, > > let me first describe what splits, groups, and partitions are. > > * Partition: This is basically all data that goes through the same task > instance. If you have an operator with a parallelism of 80, you have 80 > partitions. When you call sortPartition() you'll have 80 sorted streams, if > you call mapPartition you iterate over all records in one partition. > * Split: Splits are a concept of InputFormats. An InputFormat can process > several splits. All splits that are processed by the same data source task > make up the partition of that task. So a split is a subset of a partition. > In your case where each task reads exactly one split, the split is > equivalent to the partition. > * Group: A group is based on the groupBy attribute and hence data-driven > and does not depend on the parallelism. A groupReduce requires a > partitioning such that all records with the same grouping attribute are > sent to the same operator, i.e., all are part of the same partition. > Depending on the number of distinct grouping keys (and the hash-function) a > partition can have zero, one, or more groups. > > Now coming to your use case. You have 80 sources running on 5 machines. > All source on the same machine produce records with the same grouping key > (hostname). You can actually give a hint to Flink, that the data returned > by a split is partitioned, grouped, or sorted in a specific way. This works > as follows: > > // String is hostname, Integer is parallel id of the source task > DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat); > SplitDataProperties<Tuple3<String, Integer, Long>> splitProps = > ((DataSource)text).getSplitDataProperties(); > splitProps.splitsGroupedBy(0,1) > splitProps.splitsPartitionedBy(0,1) > > With this info, Flink knows that the data returned by our source is > partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to > run a local groupReduce operation on each of the 80 tasks (hostname and > parallel index result in 80 keys) and locally reduce the data. > Next step would be another .groupBy(0).groupReduce() which gives 16 groups > which are distributed across your tasks. > > However, you have to be careful with the SplitDataProperties. If you get > them wrong, the optimizer makes false assumption and the resulting plan > might not compute what you are looking for. > I'd recommend to read the JavaDocs and play a bit with this feature to see > how it behaves. ExecutionEnvironment.getExecutionPlan() can help to > figure out what is happening. > > Best, > Fabian > > > 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > >> Hi all, >> >> I'm having some trouble grasping what the meaning of/difference between >> the following concepts is: >> >> - Split >> - Group >> - Partition >> >> Let me elaborate a bit on the problem I'm trying to solve here. In my >> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >> standalone mode. Each node has 64G of memory and 32 cores. I'm starting the >> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >> Slots). >> >> The data I want to process resides in a local folder on each worker with >> the same path (say /tmp/input). There can be arbitrarily many input files >> in each worker's folder. I have written a custom input format that >> round-robin assigns the files to each of the 16 local input splits ( >> https://github.com/robert-schmidtke/hdfs-statistics-adapter >> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >> io/SfsInputFormat.java) to obtain a total of 80 input splits that need >> processing. Each split reads zero or more files, parsing the contents into >> records that are emitted correctly. This works as expected. >> >> Now we're getting to the questions. How do these 80 input splits relate >> to groups and partitions? My understanding of a partition is a subset of my >> DataSet<X> that is local to each node. I.e. if I were to repartition the >> data according to some scheme, a shuffling over workers would occur. After >> reading all the data, I have 80 partitions, correct? >> >> What is less clear to me is the concept of a group, i.e. the result of a >> groupBy operation. The input files I have are produced on each worker by >> some other process. I first want to do pre-aggregation (I hope that's the >> term) on each node before sending data over the network. The records I'm >> processing contain a 'hostname' attribute, which is set to the worker's >> hostname that processes the data, because the DataSources are local. That >> means the records produced by the worker on host1 always contain the >> attribute hostname=host1. Similar for the other 4 workers. >> >> Now what happens if I do a groupBy("hostname")? How do the workers >> realize that no network transfer is necessary? Is a group a logical >> abstraction, or a physical one (in my understanding a partition is physical >> because it's local to exactly one worker). >> >> What I'd like to do next is a reduceGroup to merge multiple records into >> one (some custom, yet straightforward, aggregation) and emit another record >> for every couple of input records. Am I correct in assuming that the >> Iterable<X> values passed to the reduce function all have the same hostname >> value? That is, will the operation have a parallelism of 80, where 5x16 >> operations will have the same hostname value? Because I have 16 splits per >> host, the 16 reduces on host1 should all receive values with >> hostname=host1, correct? And after the operation has finished, will the >> reduced groups (now actual DataSets again) still be local to the workers? >> >> This is quite a lot to work on I have to admit. I'm happy for any hints, >> advice and feedback on this. If there's need for clarification I'd be happy >> to provide more information. >> >> Thanks a lot in advance! >> >> Robert >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680