Hi Fabian,

I have opened a ticket for that, thanks.

I have another question: now that I have obtained the proper local
grouping, I did some aggregation of type [T] -> U, where one aggregated
object is of type U, containing information of zero or more Ts. The Us are
still tied to the hostname, and have the property hostname=hostX for the
workers they're executed on, just like before. Is it possible to specify
the grouping/partitioning for DataSets that are not DataSources, just like
you suggested before? Because my guess is that the grouping information is
lost when going from T to U.

Best and thanks for the great help!
Robert

On Fri, Jan 13, 2017 at 8:54 PM, Fabian Hueske <fhue...@gmail.com> wrote:

> I think so far getExecutionPlan() was only used for debugging purpose and
> not in programs that would also be executed.
> You can open a JIRA issue if you think that this would a valuable feature.
>
> Thanks, Fabian
>
> 2017-01-13 16:34 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>
>> Just a side note, I'm guessing there's a bug here:
>> https://github.com/apache/flink/blob/master/flink-
>> clients/src/main/java/org/apache/flink/client/program/
>> ContextEnvironment.java#L68
>>
>> It should say createProgramPlan("unnamed job", false);
>>
>> Otherwise I'm getting an exception complaining that no new sinks have
>> been added after the last execution. So currently it is not possible for me
>> to first get the execution plan and then run execute the program.
>>
>> Robert
>>
>> On Fri, Jan 13, 2017 at 3:14 PM, Robert Schmidtke <ro.schmid...@gmail.com
>> > wrote:
>>
>>> Hi Fabian,
>>>
>>> thanks for the quick and comprehensive reply. I'll have a look at the
>>> ExecutionPlan using your suggestion to check what actually gets computed,
>>> and I'll use the properties as well. If I stumble across something else
>>> I'll let you know.
>>>
>>> Many thanks again!
>>> Robert
>>>
>>> On Fri, Jan 13, 2017 at 2:40 PM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> let me first describe what splits, groups, and partitions are.
>>>>
>>>> * Partition: This is basically all data that goes through the same task
>>>> instance. If you have an operator with a parallelism of 80, you have 80
>>>> partitions. When you call sortPartition() you'll have 80 sorted streams, if
>>>> you call mapPartition you iterate over all records in one partition.
>>>> * Split: Splits are a concept of InputFormats. An InputFormat can
>>>> process several splits. All splits that are processed by the same data
>>>> source task make up the partition of that task. So a split is a subset of a
>>>> partition. In your case where each task reads exactly one split, the split
>>>> is equivalent to the partition.
>>>> * Group: A group is based on the groupBy attribute and hence
>>>> data-driven and does not depend on the parallelism. A groupReduce requires
>>>> a partitioning such that all records with the same grouping attribute are
>>>> sent to the same operator, i.e., all are part of the same partition.
>>>> Depending on the number of distinct grouping keys (and the hash-function) a
>>>> partition can have zero, one, or more groups.
>>>>
>>>> Now coming to your use case. You have 80 sources running on 5 machines.
>>>> All source on the same machine produce records with the same grouping key
>>>> (hostname). You can actually give a hint to Flink, that the data returned
>>>> by a split is partitioned, grouped, or sorted in a specific way. This works
>>>> as follows:
>>>>
>>>> // String is hostname, Integer is parallel id of the source task
>>>> DataSet<Tuple3<String, Integer, Long>> = env.createInput(YourFormat);
>>>> SplitDataProperties<Tuple3<String, Integer, Long>> splitProps =
>>>> ((DataSource)text).getSplitDataProperties();
>>>> splitProps.splitsGroupedBy(0,1)
>>>> splitProps.splitsPartitionedBy(0,1)
>>>>
>>>> With this info, Flink knows that the data returned by our source is
>>>> partitioned and grouped. Now you can do groupBy(0,1).groupReduce(XXX) to
>>>> run a local groupReduce operation on each of the 80 tasks (hostname and
>>>> parallel index result in 80 keys) and locally reduce the data.
>>>> Next step would be another .groupBy(0).groupReduce() which gives 16
>>>> groups which are distributed across your tasks.
>>>>
>>>> However, you have to be careful with the SplitDataProperties. If you
>>>> get them wrong, the optimizer makes false assumption and the resulting plan
>>>> might not compute what you are looking for.
>>>> I'd recommend to read the JavaDocs and play a bit with this feature to
>>>> see how it behaves. ExecutionEnvironment.getExecutionPlan() can help
>>>> to figure out what is happening.
>>>>
>>>> Best,
>>>> Fabian
>>>>
>>>>
>>>> 2017-01-13 12:14 GMT+01:00 Robert Schmidtke <ro.schmid...@gmail.com>:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I'm having some trouble grasping what the meaning of/difference
>>>>> between the following concepts is:
>>>>>
>>>>> - Split
>>>>> - Group
>>>>> - Partition
>>>>>
>>>>> Let me elaborate a bit on the problem I'm trying to solve here. In my
>>>>> tests I'm using a 5-node cluster, on which I'm running Flink 1.1.3 in
>>>>> standalone mode. Each node has 64G of memory and 32 cores. I'm starting 
>>>>> the
>>>>> JobManager on one node, and a TaskManager on each node. I'm assigning 16
>>>>> slots to each TaskManager, so the overall parallelism is 80 (= 5 TMs x 16
>>>>> Slots).
>>>>>
>>>>> The data I want to process resides in a local folder on each worker
>>>>> with the same path (say /tmp/input). There can be arbitrarily many input
>>>>> files in each worker's folder. I have written a custom input format that
>>>>> round-robin assigns the files to each of the 16 local input splits (
>>>>> https://github.com/robert-schmidtke/hdfs-statistics-adapter
>>>>> /blob/master/sfs-analysis/src/main/java/de/zib/sfs/analysis/
>>>>> io/SfsInputFormat.java) to obtain a total of 80 input splits that
>>>>> need processing. Each split reads zero or more files, parsing the contents
>>>>> into records that are emitted correctly. This works as expected.
>>>>>
>>>>> Now we're getting to the questions. How do these 80 input splits
>>>>> relate to groups and partitions? My understanding of a partition is a
>>>>> subset of my DataSet<X> that is local to each node. I.e. if I were to
>>>>> repartition the data according to some scheme, a shuffling over workers
>>>>> would occur. After reading all the data, I have 80 partitions, correct?
>>>>>
>>>>> What is less clear to me is the concept of a group, i.e. the result of
>>>>> a groupBy operation. The input files I have are produced on each worker by
>>>>> some other process. I first want to do pre-aggregation (I hope that's the
>>>>> term) on each node before sending data over the network. The records I'm
>>>>> processing contain a 'hostname' attribute, which is set to the worker's
>>>>> hostname that processes the data, because the DataSources are local. That
>>>>> means the records produced by the worker on host1 always contain the
>>>>> attribute hostname=host1. Similar for the other 4 workers.
>>>>>
>>>>> Now what happens if I do a groupBy("hostname")? How do the workers
>>>>> realize that no network transfer is necessary? Is a group a logical
>>>>> abstraction, or a physical one (in my understanding a partition is 
>>>>> physical
>>>>> because it's local to exactly one worker).
>>>>>
>>>>> What I'd like to do next is a reduceGroup to merge multiple records
>>>>> into one (some custom, yet straightforward, aggregation) and emit another
>>>>> record for every couple of input records. Am I correct in assuming that 
>>>>> the
>>>>> Iterable<X> values passed to the reduce function all have the same 
>>>>> hostname
>>>>> value? That is, will the operation have a parallelism of 80, where 5x16
>>>>> operations will have the same hostname value? Because I have 16 splits per
>>>>> host, the 16 reduces on host1 should all receive values with
>>>>> hostname=host1, correct? And after the operation has finished, will the
>>>>> reduced groups (now actual DataSets again) still be local to the workers?
>>>>>
>>>>> This is quite a lot to work on I have to admit. I'm happy for any
>>>>> hints, advice and feedback on this. If there's need for clarification I'd
>>>>> be happy to provide more information.
>>>>>
>>>>> Thanks a lot in advance!
>>>>>
>>>>> Robert
>>>>>
>>>>> --
>>>>> My GPG Key ID: 336E2680
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Reply via email to