Thx, now I use element.hashCode() % nPartitions and it works as expected. But I'm afraid it's not a best practice for just turning a plain (already paralellized) DataStream into a KeyedStream? Because it introduces some overhead due to physical repartitioning by key, which is unnecessary since I don't really care about keys.
On 9 June 2016 at 22:00, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Yukun, > > the problem is that the KeySelector is internally invoked multiple times. > Hence it must be deterministic, i.e., it must extract the same key for the > same object if invoked multiple times. > The documentation is not discussing this aspect and should be extended. > > Thanks for pointing out this issue. > > Cheers, > Fabian > > > 2016-06-09 13:19 GMT+02:00 Yukun Guo <gyk....@gmail.com>: > >> I’m playing with the (Window)WordCount example from Flink QuickStart. I >> generate a DataStream consisting of 1000 Strings of random digits, which >> is windowed with a tumbling count window of 50 elements: >> >> import org.apache.flink.api.common.functions.FlatMapFunction;import >> org.apache.flink.api.java.functions.KeySelector;import >> org.apache.flink.api.java.tuple.Tuple2;import >> org.apache.flink.streaming.api.datastream.DataStream;import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import >> org.apache.flink.util.Collector; >> import java.util.Random; >> public class DigitCount { >> >> >> public static void main(String[] args) throws Exception { >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> DataStream<String> text = env.fromElements( >> "14159265358979323846264338327950288419716939937510", >> "58209749445923078164062862089986280348253421170679", >> "82148086513282306647093844609550582231725359408128", >> "48111745028410270193852110555964462294895493038196", >> "44288109756659334461284756482337867831652712019091", >> "45648566923460348610454326648213393607260249141273", >> "72458700660631558817488152092096282925409171536436", >> "78925903600113305305488204665213841469519415116094", >> "33057270365759591953092186117381932611793105118548", >> "07446237996274956735188575272489122793818301194912", >> "98336733624406566430860213949463952247371907021798", >> "60943702770539217176293176752384674818467669405132", >> "00056812714526356082778577134275778960917363717872", >> "14684409012249534301465495853710507922796892589235", >> "42019956112129021960864034418159813629774771309960", >> "51870721134999999837297804995105973173281609631859", >> "50244594553469083026425223082533446850352619311881", >> "71010003137838752886587533208381420617177669147303", >> "59825349042875546873115956286388235378759375195778", >> "18577805321712268066130019278766111959092164201989" >> ); >> >> DataStream<Tuple2<Integer, Integer>> digitCount = text >> .flatMap(new Splitter()) >> .keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() { >> @Override >> public Integer getKey(Tuple2<Integer, Integer> x) throws >> Exception { >> return x.f0 % 2; >> } >> }) >> .countWindow(50) >> .sum(1); >> >> digitCount.print(); >> env.execute(); >> >> } >> >> public static final class Splitter implements FlatMapFunction<String, >> Tuple2<Integer, Integer>> { >> @Override >> public void flatMap(String value, Collector<Tuple2<Integer, >> Integer>> out) { >> for (String token : value.split("")) { >> if (token.length() == 0) { >> continue; >> } >> out.collect(Tuple2.of(Integer.parseInt(token), 1)); >> } >> } >> } >> } >> >> The code above will produce 19 lines of output which is reasonable as the >> 1000 digits will be keyed into 2 partitions where one partition contains >> 500+ elements and the other contains slightly fewer than 500 elements, >> therefore as a result one 50-digit window is ignored. >> >> So far so good, but if I replace the mod KeySelector with a random one: >> >> private static class RandomKeySelector<T> implements KeySelector<T, Integer> >> { >> private int nPartitions; >> private Random random; >> >> RandomKeySelector(int nPartitions) { >> this.nPartitions = nPartitions; >> random = new Random(); >> } >> >> @Override >> public Integer getKey(T dummy) throws Exception { >> return random.nextInt(this.nPartitions); >> } >> } >> >> and then >> >> .keyBy(new RandomKeySelector<Tuple2<Integer, Integer>>(2)) >> >> it may generate 17 or 18 lines of output. How could that happen? >> Moreover, if I set the number of partitions to 10, in theory the lines of >> output should be no fewer than 11, but actually it can be only 9. >> >> Please help me understand why countWindow behaves like this. >> > >