What’s the KeySelector you’re using? To me, this indicates that the timestamp field is somehow changing after the original keying or in transit.
Best. Aljoscha > On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote: > > I tried to reorder and the window function works fine. but then after > processing few stream of data from Topic A and Topic B, the window function > seem to throw the below error. The keyby is on eventTime field. > > java.lang.RuntimeException: Unexpected key group index. This indicates a bug. > > at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) > > at > org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98) > > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372) > > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) > > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) > > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > > at java.lang.Thread.run(Thread.java:745) > > > > Regards, > > Vijay Raajaa GS > > > On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com > <mailto:gsvijayraa...@gmail.com>> wrote: > Thanks for your input, will try to incorporate them in my implementation. > > Regards, > Vijay Raajaa G S > > On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > The approach could work, but if it can happen that an event from stream A is > not matched by an event in stream B you will have lingering state that never > goes away. For such cases it might be better to write a custom > CoProcessFunction as sketched here: > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html > > <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>. > > The idea is to keep events from each side in state and emit a result when you > get the event from the other side. You also set a cleanup timer in case no > other event arrives to make sure that state eventually goes away. > > Best, > Aljoscha > >> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com >> <mailto:gsvijayraa...@gmail.com>> wrote: >> >> Sure. Thanks for the pointer, let me reorder the same. Any comments about >> the approach followed for merging topics and creating a single JSON? >> >> Regards, >> Vijay Raajaa G S >> >> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org >> <mailto:aljos...@apache.org>> wrote: >> Hi, >> An AllWindow operator requires an AllWindowFunction, instead of a >> WindowFunction. In your case, the keyBy() seems to be in the wrong place, to >> get a keyed window you have to write something akin to: >> >> inputStream >> .keyBy(…) >> .window(…) >> .apply(…) // or reduce() >> >> In your case, you key the stream and then the keying is “lost” again because >> you apply a flatMap(). That’s why you have an all-window and not a keyed >> window. >> >> Best, >> Aljoscha >> >>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com >>> <mailto:gsvijayraa...@gmail.com>> wrote: >>> >>> Hi, >>> >>> I am trying to combine two kafka topics using the a single kafka consumer >>> on a list of topics, further convert the json string in the stream to POJO. >>> Then, join them via keyBy ( On event time field ) and to merge them as a >>> single fat json, I was planning to use a window stream and apply a window >>> function on the window stream. The assumption is that Topic-A & Topic-B can >>> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B >>> (JSON ) will be present with the same eventTime. Hence was planning to use >>> a coutWindow(2) post keyBy on eventTime. >>> >>> I have couple of questions for the same; >>> >>> 1. Is the approach fine for merging topics and creating a single JSON? >>> 2. The window function on All Window stream doesnt seem to work fine; Any >>> pointers will be greatly appreciated. >>> >>> Code Snippet : >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> logger.info <http://logger.info/>("Flink Stream Window Charger has >>> started"); >>> >>> Properties properties = new Properties(); >>> >>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 >>> <http://127.0.0.1:1030/>"); >>> >>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka >>> <http://127.0.0.1:2181/service-kafka>"); >>> >>> properties.setProperty("group.id <http://group.id/>", "group-0011"); >>> >>> properties.setProperty("auto.offset.reset", "smallest"); >>> >>> >>> >>> List < String > names = new ArrayList < > (); >>> >>> >>> >>> names.add("Topic-A"); >>> >>> names.add("Topic-B"); >>> >>> >>> >>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > >>> (names, new SimpleStringSchema(), properties)); >>> >>> DataStream < TopicPojo > pojo = stream.map(new >>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >>> >>> List < String > where = new ArrayList < String > (); >>> >>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new >>> Tokenizer()).countWindowAll(2); >>> >>> DataStream < String > data_charging = data_window.apply(new >>> MyWindowFunction()); >>> >>> data_charging.addSink(new SinkFunction < String > () { >>> >>> >>> >>> public void invoke(String value) throws Exception { >>> >>> >>> >>> // Yet to be implemented - Merge two POJO into one >>> >>> } >>> >>> }); >>> >>> >>> >>> try >>> >>> { >>> >>> env.execute(); >>> >>> } catch (Exception e) >>> >>> { >>> >>> return; >>> >>> } >>> >>> } >>> >>> } >>> >>> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >>> >>> private static final long serialVersionUID = 1 L; >>> >>> @Override >>> >>> public void flatMap(TopicPojo value, Collector < String > out) throws >>> Exception { >>> >>> ObjectMapper mapper = new ObjectMapper(); >>> >>> out.collect(mapper.writeValueAsString(value)); >>> >>> } >>> >>> } >>> >>> class MyWindowFunction implements WindowFunction < TopicPojo, String, >>> String, GlobalWindow > { >>> >>> @Override >>> >>> public void apply(String key, GlobalWindow window, Iterable < TopicPojo > >>> arg2, Collector < String > out) >>> >>> throws Exception { >>> >>> int count = 0; >>> >>> for (TopicPojo in : arg2) { >>> >>> count++; >>> >>> } >>> >>> // Test Result - TO be modified >>> >>> out.collect("Window: " + window + "count: " + count); >>> >>> >>> >>> } >>> >>> } >>> >>> class Deserializer implements MapFunction < String, TopicPojo > { >>> >>> private static final long serialVersionUID = 1 L; >>> >>> @Override >>> >>> public TopicPojo map(String value) throws IOException { >>> >>> // TODO Auto-generated method stub >>> >>> ObjectMapper mapper = new ObjectMapper(); >>> >>> TopicPojo obj = null; >>> >>> try { >>> >>> >>> >>> System.out.println(value); >>> >>> >>> >>> obj = mapper.readValue(value, TopicPojo.class); >>> >>> >>> >>> } catch (JsonParseException e) { >>> >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> >>> >>> } catch (JsonMappingException e) { >>> >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } catch (IOException e) { >>> >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } >>> >>> return obj; >>> >>> } >>> >>> } >>> >>> >>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) >>> in the type AllWindowedStream<String,GlobalWindow> is not applicable for >>> the arguments (MyWindowFunction) error. >>> >>> Kindly give your input. >>> >>> Regards, >>> Vijay Raajaa GS >>> >> >> > > >