Hi Fabian, I have opened a ticket for that, thanks.
I have another question: now that I have obtained the proper local grouping, I did some aggregation of type [T] -> U, where one aggregated object is of type U, containing information of zero or more Ts. The Us are still tied to the hostname, and have the property hostname=hostX for the workers they're executed on, just like before. Is it possible to specify the grouping/partitioning for DataSets that are not DataSources, just like you suggested before? Because my guess is that the grouping information is lost when going from T to U. Best and thanks for the great help! Robert On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fhue...@gmail.com> wrote: > I think so far getExecutionPlan() was only used for debugging purpose and > not in programs that would also be executed. > You can open a JIRA issue if you think that this would a valuable feature. > > Thanks, Fabian > > 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: > >> Just a side note, I'm guessing there's a bug here: >> https://github.com/apache/flink/blob/master/flink- >> clients/src/main/java/org/apache/flink/client/program/ >> ContextEnvironment.java#L68 >> >> It should say createProgramPlan("unnamed job", false); >> >> Otherwise I'm getting an exception complaining that no new sinks have >> been added after the last execution. So currently it is not possible for me >> to first get the execution plan and then run execute the program. >> >> Robert >> >> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmid...@gmail.com >> > wrote: >> >>> Hi Fabian, >>> >>> thanks for the quick and comprehensive reply. I'll have a look at the >>> ExecutionPlan using your suggestion to check what actually gets computed, >>> and I'll use the properties as well. If I stumble across something else >>> I'll let you know. >>> >>> Many thanks again! >>> Robert >>> >>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Robert, >>>> >>>> let me first describe what splits, groups, and partitions are. >>>> >>>> * Partition: This is basically all data that goes through the same task >>>> instance. If you have an operator with a parallelism of 80, you have 80 >>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if >>>> you call mapPartition you iterate over all records in one partition. >>>> * Split: Splits are a concept of InputFormats. An InputFormat can >>>> process several splits. All splits that are processed by the same data >>>> source task make up the partition of that task. So a split is a subset of a >>>> partition. In your case where each task reads exactly one split, the split >>>> is equivalent to the partition. >>>> * Group: A group is based on the groupBy attribute and hence >>>> data-driven and does not depend on the parallelism. A groupReduce requires >>>> a partitioning such that all records with the same grouping attribute are >>>> sent to the same operator, i.e., all are part of the same partition. >>>> Depending on the number of distinct grouping keys (and the hash-function) a >>>> partition can have zero, one, or more groups. >>>> >>>> Now coming to your use case. You have 80 sources running on 5 machines. >>>> All source on the same machine produce records with the same grouping key >>>> (hostname). You can actually give a hint to Flink, that the data returned >>>> by a split is partitioned, grouped, or sorted in a specific way. This works >>>> as follows: >>>> >>>> // String is hostname, Integer is parallel id of the source task >>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat); >>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps = >>>> ((DataSource)text).getSplitDataProperties(); >>>> splitProps.splitsGroupedBy(0,1) >>>> splitProps.splitsPartitionedBy(0,1) >>>> >>>> With this info, Flink knows that the data returned by our source is >>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to >>>> run a local groupReduce operation on each of the 80 tasks (hostname and >>>> parallel index result in 80 keys) and locally reduce the data. >>>> Next step would be another .groupBy(0).groupReduce() which gives 16 >>>> groups which are distributed across your tasks. >>>> >>>> However, you have to be careful with the SplitDataProperties. If you >>>> get them wrong, the optimizer makes false assumption and the resulting plan >>>> might not compute what you are looking for. >>>> I'd recommend to read the JavaDocs and play a bit with this feature to >>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help >>>> to figure out what is happening. >>>> >>>> Best, >>>> Fabian >>>> >>>> >>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>: >>>> >>>>> Hi all, >>>>> >>>>> I'm having some trouble grasping what the meaning of/difference >>>>> between the following concepts is: >>>>> >>>>> - Split >>>>> - Group >>>>> - Partition >>>>> >>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my >>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in >>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting >>>>> the >>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16 >>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16 >>>>> Slots). >>>>> >>>>> The data I want to process resides in a local folder on each worker >>>>> with the same path (say /tmp/input). There can be arbitrarily many input >>>>> files in each worker's folder. I have written a custom input format that >>>>> round-robin assigns the files to each of the 16 local input splits ( >>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter >>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/ >>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that >>>>> need processing. Each split reads zero or more files, parsing the contents >>>>> into records that are emitted correctly. This works as expected. >>>>> >>>>> Now we're getting to the questions. How do these 80 input splits >>>>> relate to groups and partitions? My understanding of a partition is a >>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to >>>>> repartition the data according to some scheme, a shuffling over workers >>>>> would occur. After reading all the data, I have 80 partitions, correct? >>>>> >>>>> What is less clear to me is the concept of a group, i.e. the result of >>>>> a groupBy operation. The input files I have are produced on each worker by >>>>> some other process. I first want to do pre-aggregation (I hope that's the >>>>> term) on each node before sending data over the network. The records I'm >>>>> processing contain a 'hostname' attribute, which is set to the worker's >>>>> hostname that processes the data, because the DataSources are local. That >>>>> means the records produced by the worker on host1 always contain the >>>>> attribute hostname=host1. Similar for the other 4 workers. >>>>> >>>>> Now what happens if I do a groupBy("hostname")? How do the workers >>>>> realize that no network transfer is necessary? Is a group a logical >>>>> abstraction, or a physical one (in my understanding a partition is >>>>> physical >>>>> because it's local to exactly one worker). >>>>> >>>>> What I'd like to do next is a reduceGroup to merge multiple records >>>>> into one (some custom, yet straightforward, aggregation) and emit another >>>>> record for every couple of input records. Am I correct in assuming that >>>>> the >>>>> Iterable<X> values passed to the reduce function all have the same >>>>> hostname >>>>> value? That is, will the operation have a parallelism of 80, where 5x16 >>>>> operations will have the same hostname value? Because I have 16 splits per >>>>> host, the 16 reduces on host1 should all receive values with >>>>> hostname=host1, correct? And after the operation has finished, will the >>>>> reduced groups (now actual DataSets again) still be local to the workers? >>>>> >>>>> This is quite a lot to work on I have to admit. I'm happy for any >>>>> hints, advice and feedback on this. If there's need for clarification I'd >>>>> be happy to provide more information. >>>>> >>>>> Thanks a lot in advance! >>>>> >>>>> Robert >>>>> >>>>> -- >>>>> My GPG Key ID: 336E2680 >>>>> >>>> >>>> >>> >>> >>> -- >>> My GPG Key ID: 336E2680 >>> >> >> >> >> -- >> My GPG Key ID: 336E2680 >> > > -- My GPG Key ID: 336E2680