Thx, now I use element.hashCode() % nPartitions and it works as expected.

But I'm afraid it's not a best practice for just turning a plain (already
paralellized) DataStream into a KeyedStream? Because it introduces some
overhead due to physical repartitioning by key, which is unnecessary since
I don't really care about keys.

On 9 June 2016 at 22:00, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Yukun,
>
> the problem is that the KeySelector is internally invoked multiple times.
> Hence it must be deterministic, i.e., it must extract the same key for the
> same object if invoked multiple times.
> The documentation is not discussing this aspect and should be extended.
>
> Thanks for pointing out this issue.
>
> Cheers,
> Fabian
>
>
> 2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk....@gmail.com>:
>
>> I’m playing with the (Window)WordCount example from Flink QuickStart. I
>> generate a DataStream consisting of 1000 Strings of random digits, which
>> is windowed with a tumbling count window of 50 elements:
>>
>> import org.apache.flink.api.common.functions.FlatMapFunction;import 
>> org.apache.flink.api.java.functions.KeySelector;import 
>> org.apache.flink.api.java.tuple.Tuple2;import 
>> org.apache.flink.streaming.api.datastream.DataStream;import 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import 
>> org.apache.flink.util.Collector;
>> import java.util.Random;
>> public class DigitCount {
>>
>>
>>     public static void main(String[] args) throws Exception {
>>         final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>>         DataStream<String> text = env.fromElements(
>>                 "14159265358979323846264338327950288419716939937510",
>>                 "58209749445923078164062862089986280348253421170679",
>>                 "82148086513282306647093844609550582231725359408128",
>>                 "48111745028410270193852110555964462294895493038196",
>>                 "44288109756659334461284756482337867831652712019091",
>>                 "45648566923460348610454326648213393607260249141273",
>>                 "72458700660631558817488152092096282925409171536436",
>>                 "78925903600113305305488204665213841469519415116094",
>>                 "33057270365759591953092186117381932611793105118548",
>>                 "07446237996274956735188575272489122793818301194912",
>>                 "98336733624406566430860213949463952247371907021798",
>>                 "60943702770539217176293176752384674818467669405132",
>>                 "00056812714526356082778577134275778960917363717872",
>>                 "14684409012249534301465495853710507922796892589235",
>>                 "42019956112129021960864034418159813629774771309960",
>>                 "51870721134999999837297804995105973173281609631859",
>>                 "50244594553469083026425223082533446850352619311881",
>>                 "71010003137838752886587533208381420617177669147303",
>>                 "59825349042875546873115956286388235378759375195778",
>>                 "18577805321712268066130019278766111959092164201989"
>>         );
>>
>>         DataStream<Tuple2<Integer, Integer>> digitCount = text
>>                 .flatMap(new Splitter())
>>                 .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
>>                     @Override
>>                     public Integer getKey(Tuple2<Integer, Integer> x) throws 
>> Exception {
>>                         return x.f0 % 2;
>>                     }
>>                 })
>>                 .countWindow(50)
>>                 .sum(1);
>>
>>         digitCount.print();
>>         env.execute();
>>
>>     }
>>
>>     public static final class Splitter implements FlatMapFunction<String, 
>> Tuple2<Integer, Integer>> {
>>         @Override
>>         public void flatMap(String value, Collector<Tuple2<Integer, 
>> Integer>> out) {
>>             for (String token : value.split("")) {
>>                 if (token.length() == 0) {
>>                     continue;
>>                 }
>>                 out.collect(Tuple2.of(Integer.parseInt(token), 1));
>>             }
>>         }
>>     }
>> }
>>
>> The code above will produce 19 lines of output which is reasonable as the
>> 1000 digits will be keyed into 2 partitions where one partition contains
>> 500+ elements and the other contains slightly fewer than 500 elements,
>> therefore as a result one 50-digit window is ignored.
>>
>> So far so good, but if I replace the mod KeySelector with a random one:
>>
>> private static class RandomKeySelector<T> implements KeySelector<T, Integer> 
>> {
>>     private int nPartitions;
>>     private Random random;
>>
>>     RandomKeySelector(int nPartitions) {
>>         this.nPartitions = nPartitions;
>>         random = new Random();
>>     }
>>
>>     @Override
>>     public Integer getKey(T dummy) throws Exception {
>>         return random.nextInt(this.nPartitions);
>>     }
>> }
>>
>> and then
>>
>> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2))
>>
>> it may generate 17 or 18 lines of output. How could that happen?
>> Moreover, if I set the number of partitions to 10, in theory the lines of
>> output should be no fewer than 11, but actually it can be only 9.
>>
>> Please help me understand why countWindow behaves like this.
>>
>
>

Reply via email to