I tried to reorder and the window function works fine. but then after processing few stream of data from Topic A and Topic B, the window function seem to throw the below error. The keyby is on eventTime field.
java.lang.RuntimeException: Unexpected key group index. This indicates a bug. at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) at org.apache.flink.runtime.state.heap.HeapListState.add( HeapListState.java:98) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement( WindowOperator.java:372) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput( StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run( OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( StreamTask.java:272) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:745) Regards, Vijay Raajaa GS On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote: > Thanks for your input, will try to incorporate them in my implementation. > > Regards, > Vijay Raajaa G S > > On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> The approach could work, but if it can happen that an event from stream A >> is not matched by an event in stream B you will have lingering state that >> never goes away. For such cases it might be better to write a custom >> CoProcessFunction as sketched here: https://ci.apache.org/pr >> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html. >> >> The idea is to keep events from each side in state and emit a result when >> you get the event from the other side. You also set a cleanup timer in case >> no other event arrives to make sure that state eventually goes away. >> >> Best, >> Aljoscha >> >> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> >> wrote: >> >> Sure. Thanks for the pointer, let me reorder the same. Any comments about >> the approach followed for merging topics and creating a single JSON? >> >> Regards, >> Vijay Raajaa G S >> >> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Hi, >>> An AllWindow operator requires an AllWindowFunction, instead of a >>> WindowFunction. In your case, the keyBy() seems to be in the wrong place, >>> to get a keyed window you have to write something akin to: >>> >>> inputStream >>> .keyBy(…) >>> .window(…) >>> .apply(…) // or reduce() >>> >>> In your case, you key the stream and then the keying is “lost” again >>> because you apply a flatMap(). That’s why you have an all-window and not a >>> keyed window. >>> >>> Best, >>> Aljoscha >>> >>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> >>> wrote: >>> >>> Hi, >>> >>> I am trying to combine two kafka topics using the a single kafka >>> consumer on a list of topics, further convert the json string in the stream >>> to POJO. Then, join them via keyBy ( On event time field ) and to merge >>> them as a single fat json, I was planning to use a window stream and apply >>> a window function on the window stream. The assumption is that Topic-A & >>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , >>> Topic B (JSON ) will be present with the same eventTime. Hence was planning >>> to use a coutWindow(2) post keyBy on eventTime. >>> >>> I have couple of questions for the same; >>> >>> 1. Is the approach fine for merging topics and creating a single JSON? >>> 2. The window function on All Window stream doesnt seem to work fine; >>> Any pointers will be greatly appreciated. >>> >>> Code Snippet : >>> >>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>> ExecutionEnvironment(); >>> >>> logger.info("Flink Stream Window Charger has started"); >>> >>> Properties properties = new Properties(); >>> >>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); >>> >>> properties.setProperty("zookeeper.connect", " >>> 127.0.0.1:2181/service-kafka"); >>> >>> properties.setProperty("group.id", "group-0011"); >>> >>> properties.setProperty("auto.offset.reset", "smallest"); >>> >>> >>> List < String > names = new ArrayList < > (); >>> >>> >>> names.add("Topic-A"); >>> >>> names.add("Topic-B"); >>> >>> >>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >>> > (names, new SimpleStringSchema(), properties)); >>> >>> DataStream < TopicPojo > pojo = stream.map(new >>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >>> >>> List < String > where = new ArrayList < String > (); >>> >>> AllWindowedStream < String, GlobalWindow > data_window = >>> pojo.flatMap(new Tokenizer()).countWindowAll(2); >>> >>> DataStream < String > data_charging = data_window.apply(new >>> MyWindowFunction()); >>> >>> data_charging.addSink(new SinkFunction < String > () { >>> >>> >>> public void invoke(String value) throws Exception { >>> >>> >>> // Yet to be implemented - Merge two POJO into one >>> >>> } >>> >>> }); >>> >>> >>> try >>> >>> { >>> >>> env.execute(); >>> >>> } catch (Exception e) >>> >>> { >>> >>> return; >>> >>> } >>> >>> } >>> >>> } >>> >>> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >>> >>> private static final long serialVersionUID = 1 L; >>> >>> @Override >>> >>> public void flatMap(TopicPojo value, Collector < String > out) throws >>> Exception { >>> >>> ObjectMapper mapper = new ObjectMapper(); >>> >>> out.collect(mapper.writeValueAsString(value)); >>> >>> } >>> >>> } >>> >>> class MyWindowFunction implements WindowFunction < TopicPojo, String, >>> String, GlobalWindow > { >>> >>> @Override >>> >>> public void apply(String key, GlobalWindow window, Iterable < TopicPojo >>> > arg2, Collector < String > out) >>> >>> throws Exception { >>> >>> int count = 0; >>> >>> for (TopicPojo in : arg2) { >>> >>> count++; >>> >>> } >>> >>> // Test Result - TO be modified >>> >>> out.collect("Window: " + window + "count: " + count); >>> >>> >>> } >>> >>> } >>> >>> class Deserializer implements MapFunction < String, TopicPojo > { >>> >>> private static final long serialVersionUID = 1 L; >>> >>> @Override >>> >>> public TopicPojo map(String value) throws IOException { >>> >>> // TODO Auto-generated method stub >>> >>> ObjectMapper mapper = new ObjectMapper(); >>> >>> TopicPojo obj = null; >>> >>> try { >>> >>> >>> System.out.println(value); >>> >>> >>> obj = mapper.readValue(value, TopicPojo.class); >>> >>> >>> } catch (JsonParseException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> >>> } catch (JsonMappingException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } catch (IOException e) { >>> >>> >>> // TODO Auto-generated catch block >>> >>> >>> throw new IOException("Failed to deserialize JSON object."); >>> >>> } >>> >>> return obj; >>> >>> } >>> >>> } >>> >>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) >>> in the type AllWindowedStream<String,GlobalWindow> is not applicable >>> for the arguments (MyWindowFunction) error. >>> >>> Kindly give your input. >>> >>> Regards, >>> Vijay Raajaa GS >>> >>> >>> >> >> >