Hi, Konstantin is right. reinterpreteAsKeyedStream only works if you call it on a DataStream that was keyBy'ed before (with the same parallelism). Flink cannot reuse the partioning of another system like Kafka.
Best, Fabian Adrienne Kole <adrienneko...@gmail.com> schrieb am Do., 4. Apr. 2019, 14:33: > Thanks a lot for the replies. > > Below I paste my code: > > > DataStreamSource<Tuple> source = env.addSource(new MySource()); > KeyedStream<Tuple, Integer> keyedStream = > DataStreamUtils.reinterpretAsKeyedStream(source, new DummyKeySelector(), > TypeInformation.of(Integer.class) ); > keyedStream.timeWindow(Time.seconds(1)).apply(new > WindowFunction<Tuple, Object, Integer, TimeWindow>() { > @Override > public void apply(Integer integer, TimeWindow timeWindow, > Iterable<Tuple> iterable, Collector<Object> collector) throws Exception { > collector.collect(1); > } > }); > env.execute("Test"); > > static class DummyKeySelector implements KeySelector<Tuple, Integer> { > > @Override > public Integer getKey(Tuple value) throws Exception { > return value.getSourceID(); > } > } > > static class MySource extends RichParallelSourceFunction<Tuple> { > public MySource() { > this.sourceID = sourceID; > } > @Override > public void open(Configuration parameters) throws Exception { > sourceID = sourceID + > getRuntimeContext().getIndexOfThisSubtask(); > } > > @Override > public void run(SourceContext<Tuple> ctx) throws Exception { > while (true) { > Tuple tuple = new Tuple(sourceID); > ctx.collect(tuple); > } > } > > @Override > public void cancel() { > > } > } > > > Whatever I do, I get > Caused by: java.lang.IllegalArgumentException > at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) > > When I check the details from the source code, it seems that some keys are > not within allowed key range, that is why Flink throws an exception. > In this case, as Konstantin said, it is not possible to interpret source > as keyed. > Please correct me if I am wrong. > > > Thanks, > Adrienne > > > > > > > > On Wed, Apr 3, 2019 at 8:08 PM Konstantin Knauf <konstan...@ververica.com> > wrote: > >> Hi Adrienne, >> >> you can only use DataStream#reinterpretAsKeyedStream on a stream, which >> has previously been keyed/partitioned by Flink with exactly the same >> KeySelector as given to reinterpretAsKeyedStream. It does not work with a >> key-partitioned stream, which has been partitioned by any other process. >> >> Best, >> >> Konstantin >> >> On Fri, Mar 29, 2019 at 11:47 PM Rong Rong <walter...@gmail.com> wrote: >> >>> Hi Adrienne, >>> >>> I think you should be able to reinterpretAsKeyedStream by passing in a >>> DataStreamSource based on the ITCase example [1]. >>> Can you share the full code/error logs or the IAE? >>> >>> -- >>> Rong >>> >>> [1] >>> https://github.com/apache/flink/blob/release-1.7.2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/ReinterpretDataStreamAsKeyedStreamITCase.java#L98 >>> >>> On Fri, Mar 29, 2019 at 6:09 AM Adrienne Kole <adrienneko...@gmail.com> >>> wrote: >>> >>>> Dear community, >>>> >>>> I have a use-case where sources are keyed. >>>> For example, there is a source function with parallelism 10, and each >>>> instance has its own key. >>>> I used reinterpretAsKeyedStream to convert source DataStream to >>>> KeyedStream, however, I get an IllegalArgument exception. >>>> Is reinterpretAsKeyedStream can be used with source operators as well, >>>> or should the operator to be used be already partitioned (by keyby(..)) ? >>>> >>>> Thanks, >>>> Adrienne >>>> >>> >> >> -- >> >> Konstantin Knauf | Solutions Architect >> >> +49 160 91394525 >> >> <https://www.ververica.com/> >> >> Follow us @VervericaData >> >> -- >> >> Join Flink Forward <https://flink-forward.org/> - The Apache Flink >> Conference >> >> Stream Processing | Event Driven | Real Time >> >> -- >> >> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >> >> -- >> Data Artisans GmbH >> Registered at Amtsgericht Charlottenburg: HRB 158244 B >> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >> >