I tried the timestamp field as a string datatype as well as a Date object.
Getting same error in both the cases;

Please find the POJO file:

import java.text.DateFormat;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import com.fasterxml.jackson.annotation.JsonAnyGetter;

import com.fasterxml.jackson.annotation.JsonAnySetter;

import com.fasterxml.jackson.annotation.JsonFormat;

import com.fasterxml.jackson.annotation.JsonIgnore;

import com.fasterxml.jackson.annotation.JsonInclude;

import com.fasterxml.jackson.annotation.JsonProperty;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import org.apache.commons.lang.builder.ToStringBuilder;






public class TopicPojo {


private List<List<Double>> data = null;


private List<String> label = null;


private  static  Date eventTime;


* No args constructor for use in serialization



public TopicPojo() {




* @param data

* @param label

* @param eventTime


public SammonsPojo(List<List<Double>> data, List<String> label, Date
eventTime) {


this.data = data;

this.label = label;

this.eventTime = eventTime;



public List<List<Double>> getData() {

return data;



public void setData(List<List<Double>> data) {

this.data = data;



public List<String> getLabel() {

return label;



public void setLabel(List<String> label) {

this.label = label;



public static Date getEventTime() {

return eventTime;



public void setEventTime(Date eventTime) {

this.eventTime = eventTime;



public String toString() {

return ToStringBuilder.reflectionToString(this);



The above code pertains to eventTime as Date object , tried them as String
as well.


> What’s the KeySelector you’re using? To me, this indicates that the
> timestamp field is somehow changing after the original keying or in transit.
> Best.
> Aljoscha
> On 4. May 2017, at 22:01, G.S.Vijay Raajaa wrote:
> wrote:
> I tried to reorder and the window function works fine. but then after
> processing few stream of data from Topic A and Topic B, the window function
> seem to throw the below error. The keyby is on eventTime field.
> java.lang.RuntimeException: Unexpected key group index. This indicates a
> bug.
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> at org.apache.flink.runtime.state.heap.HeapListState.add(H
> eapListState.java:98)
> at org.apache.flink.streaming.runtime.operators.windowing.
> WindowOperator.processElement(WindowOperator.java:372)
> at org.apache.flink.streaming.runtime.io.StreamInputProcessor.
> processInput(StreamInputProcessor.java:185)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInpu
> tStreamTask.java:63)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:272)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> at java.lang.Thread.run(Thread.java:745)
>> Thanks for your input, will try to incorporate them in my implementation.
>>> The approach could work, but if it can happen that an event from stream
>>> A is not matched by an event in stream B you will have lingering state that
>>> never goes away. For such cases it might be better to write a custom
>>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>>> The idea is to keep events from each side in state and emit a result
>>> when you get the event from the other side. You also set a cleanup timer in
>>> case no other event arrives to make sure that state eventually goes away.
>>> Best,
>>> Aljoscha
>>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa wrote:
>>> wrote:
>>> Sure. Thanks for the pointer, let me reorder the same. Any comments
>>> about the approach followed for merging topics and creating a single
>>> JSON?
>>>> Hi,
>>>> An AllWindow operator requires an AllWindowFunction, instead of a
>>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
>>>> to get a keyed window you have to write something akin to:
>>>> inputStream
>>>>   .keyBy(…)
>>>>   .window(…)
>>>>   .apply(…) // or reduce()
>>>> In your case, you key the stream and then the keying is “lost” again
>>>> because you apply a flatMap(). That’s why you have an all-window and not a
>>>> keyed window.
>>>> Best,
>>>> Aljoscha
>>>> Hi,
>>>> I am trying to combine two kafka topics using the a single kafka
>>>> consumer on a list of topics, further convert the json string in the stream
>>>> to POJO. Then, join them via keyBy ( On event time field ) and to merge
>>>> them as a single fat json, I was planning to use a window stream and apply
>>>> a window function on the window stream. The assumption is that Topic-A &
>>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) ,
>>>> Topic B (JSON ) will be present with the same eventTime. Hence was planning
>>>> to use a coutWindow(2) post keyBy on eventTime.
>>>> I have couple of questions for the same;
>>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>>> 2. The window function on All Window stream doesnt seem to work fine;
>>>> Any pointers will be greatly appreciated.
>>>> Code Snippet :
>>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>>> ExecutionEnvironment();
>>>> logger.info("Flink Stream Window Charger has started");
>>>> Properties properties = new Properties();
>>>> properties.setProperty("bootstrap.servers", "");
>>>> properties.setProperty("zookeeper.connect", "
>>>> properties.setProperty("group.id", "group-0011");
>>>> properties.setProperty("auto.offset.reset", "smallest");
>>>> List < String > names = new ArrayList < > ();
>>>> names.add("Topic-A");
>>>> names.add("Topic-B");
>>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08
>>>> < > (names, new SimpleStringSchema(), properties));
>>>> DataStream < TopicPojo > pojo = stream.map(new
>>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>>> List < String > where = new ArrayList < String > ();
>>>> AllWindowedStream < String, GlobalWindow > data_window =
>>>> pojo.flatMap(new Tokenizer()).countWindowAll(2);
>>>> DataStream < String > data_charging = data_window.apply(new
>>>> MyWindowFunction());
>>>> data_charging.addSink(new SinkFunction < String > () {
>>>> public void invoke(String value) throws Exception {
>>>>   // Yet to be implemented - Merge two POJO into one
>>>>  }
>>>> });
>>>> try
>>>> {
>>>>  env.execute();
>>>> } catch (Exception e)
>>>> {
>>>>  return;
>>>> }
>>>> }
>>>> }
>>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>>>  private static final long serialVersionUID = 1 L;
>>>>  @Override
>>>>  public void flatMap(TopicPojo value, Collector < String > out) throws
>>>> Exception {
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>>   out.collect(mapper.writeValueAsString(value));
>>>>  }
>>>> }
>>>> class MyWindowFunction implements WindowFunction < TopicPojo, String,
>>>> String, GlobalWindow > {
>>>>  @Override
>>>>  public void apply(String key, GlobalWindow window, Iterable <
>>>> TopicPojo > arg2, Collector < String > out)
>>>>  throws Exception {
>>>>   int count = 0;
>>>>   for (TopicPojo in : arg2) {
>>>>    count++;
>>>>   }
>>>>   // Test Result - TO be modified
>>>>   out.collect("Window: " + window + "count: " + count);
>>>>  }
>>>> }
>>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>>>  private static final long serialVersionUID = 1 L;
>>>>  @Override
>>>>  public TopicPojo map(String value) throws IOException {
>>>>   // TODO Auto-generated method stub
>>>>   ObjectMapper mapper = new ObjectMapper();
>>>>   TopicPojo obj = null;
>>>>   try {
>>>>    System.out.println(value);
>>>>    obj = mapper.readValue(value, TopicPojo.class);
>>>>   } catch (JsonParseException e) {
>>>>    // TODO Auto-generated catch block
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>   } catch (JsonMappingException e) {
>>>>    // TODO Auto-generated catch block
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>   } catch (IOException e) {
>>>>    // TODO Auto-generated catch block
>>>>    throw new IOException("Failed to deserialize JSON object.");
>>>>   }
>>>>   return obj;
>>>>  }
>>>> }
>>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
>>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable
>>>> for the arguments (MyWindowFunction) error.
>>>> Kindly give your input.
