Hi,

Any thoughts on this issue: related to what Till proposed 'to figure a key out 
whose hashes are uniformly distributed over the key groups’ and a way of 
exposing the key group assignment through the api?

I wonder how other users are facing this issue.

Having a small set of keys (related to input.keyBy) could be easily tackled 
with some sort of local mapping but I am considering an use case with millions 
of keys.

Best,
Ovidiu


> On 20 Feb 2017, at 15:45, Ovidiu-Cristian MARCU 
> <ovidiu-cristian.ma...@inria.fr> wrote:
> 
> Hi,
> 
> Thank you for clarifications (I am working with KeyedStream so a custom 
> partitioner does not help).
> 
> So I should set maxParallelism>=parallelism and change my keys (from 
> input.keyBy(0)) such that key group assignment works as expected), 
> but I can’t modify these keys in order to make it work.
> 
> The other option is to change Flink’s internals in order to evenly distribute 
> keys (changing computeKeyGroupForKeyHash: is this enough?).
> What I was looking for was an api to change the way key group assignment is 
> done, but without changing Flink’s runtime.
> 
> I think that the maxParallelism setting is not enough (it introduces this 
> inefficient way of distributing data for processing when using KeyedStream).
> Is it possible to expose somehow the key group assignment?
> 
> This is how keys are distributed (1024 keys, key=1..1024; and groups from 2 
> to 16 - equiv. parallelism that is number of slots):
> 
> {0=517, 1=507} 2
> {0=881, 1=809, 2=358} 3
> {0=1139, 1=1048, 2=617, 3=268} 4
> {0=1319, 1=1268, 2=829, 3=473, 4=207} 5
> {0=1512, 1=1425, 2=1008, 3=644, 4=352, 5=179} 6
> {0=1656, 1=1586, 2=1160, 3=781, 4=512, 5=310, 6=139} 7
> {0=1781, 1=1718, 2=1280, 3=908, 4=645, 5=417, 6=278, 7=141} 8
> {0=1901, 1=1828, 2=1395, 3=1031, 4=738, 5=529, 6=399, 7=240, 8=131} 9
> {0=1996, 1=1934, 2=1493, 3=1134, 4=846, 5=614, 6=513, 7=354, 8=233, 9=99} 10
> {0=2094, 1=2017, 2=1577, 3=1226, 4=935, 5=713, 6=610, 7=434, 8=359, 9=174, 
> 10=101} 11
> {0=2192, 1=2091, 2=1669, 3=1316, 4=1008, 5=797, 6=705, 7=517, 8=446, 9=255, 
> 10=173, 11=95} 12
> {0=2257, 1=2175, 2=1741, 3=1396, 4=1079, 5=882, 6=785, 7=596, 8=524, 9=340, 
> 10=254, 11=186, 12=73} 13
> {0=2330, 1=2258, 2=1816, 3=1458, 4=1160, 5=951, 6=858, 7=667, 8=602, 9=417, 
> 10=329, 11=265, 12=135, 13=66} 14
> {0=2397, 1=2323, 2=1897, 3=1542, 4=1233, 5=1008, 6=934, 7=723, 8=671, 9=479, 
> 10=385, 11=344, 12=210, 13=118, 14=72} 15
> {0=2454, 1=2395, 2=1949, 3=1603, 4=1296, 5=1055, 6=998, 7=803, 8=739, 9=539, 
> 10=453, 11=410, 12=280, 13=178, 14=147, 15=61} 16
> 
> Best,
> Ovidiu
> 
>> On 20 Feb 2017, at 12:04, Till Rohrmann <trohrm...@apache.org> wrote:
>> 
>> Hi Ovidiu,
>> 
>> the way Flink works is to assign key group ranges to operators. For each 
>> element you calculate a hash value and based on that you assign it to a key 
>> group. Thus, in your example, you have either a key group with more than 1 
>> key or multiple key groups with 1 or more keys assigned to an operator.
>> 
>> So what you could try to do is to reduce the number of key groups to your 
>> parallelism via env.setMaxParallelism() and then try to figure a key out 
>> whose hashes are uniformly distributed over the key groups. The key group 
>> assignment is calculated via murmurHash(key.hashCode()) % maxParallelism.
>> 
>> Alternatively if you don’t need a keyed stream, you could try to use a 
>> custom partitioner via DataStream.partitionCustom.
>> 
>> Cheers,
>> Till
>> 
>> 
>> On Mon, Feb 20, 2017 at 11:46 AM, Ovidiu-Cristian MARCU 
>> <ovidiu-cristian.ma...@inria.fr <mailto:ovidiu-cristian.ma...@inria.fr>> 
>> wrote:
>> Hi,
>> 
>> Can you please comment on how can I ensure stream input records are 
>> distributed evenly onto task slots?
>> See attached screen Records received issue.
>> 
>> I have a simple application which is applying some window function over a 
>> stream partitioned as follows:
>> (parallelism is equal to the number of keys; records with the same key are 
>> streamed evenly)
>> 
>> // get the execution environment
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> // get input data by connecting to the socket
>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, 
>> Long>> input = text.flatMap(...);
>> DataStream<Double> counts1 = null;
>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>>              .apply(new WindowFunction<Tuple8<String, String, String, 
>> Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>>              ...
>>              });
>> counts1.writeAsText(params.get("output1"));
>> env.execute("Socket Window WordCount”);
>> 
>> Best,
>> Ovidiu
>> 
>> 
> 

Reply via email to