What’s the KeySelector you’re using? To me, this indicates that the timestamp 
field is somehow changing after the original keying or in transit.

Best.
Aljoscha
> On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote:
> 
> I tried to reorder and the window function works fine. but then after 
> processing few stream of data from Topic A and Topic B, the window function 
> seem to throw the below error. The keyby is on eventTime field.
> 
> java.lang.RuntimeException: Unexpected key group index. This indicates a bug.
> 
> at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)
> 
> at 
> org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:98)
> 
> at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:372)
> 
> at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:272)
> 
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> 
> at java.lang.Thread.run(Thread.java:745)
> 
> 
> 
> Regards,
> 
> Vijay Raajaa GS 
> 
> 
> On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
> <mailto:gsvijayraa...@gmail.com>> wrote:
> Thanks for your input, will try to incorporate them in my implementation.
> 
> Regards,
> Vijay Raajaa G S
> 
> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> The approach could work, but if it can happen that an event from stream A is 
> not matched by an event in stream B you will have lingering state that never 
> goes away. For such cases it might be better to write a custom 
> CoProcessFunction as sketched here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/process_function.html>.
> 
> The idea is to keep events from each side in state and emit a result when you 
> get the event from the other side. You also set a cleanup timer in case no 
> other event arrives to make sure that state eventually goes away.
> 
> Best,
> Aljoscha
> 
>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>> <mailto:gsvijayraa...@gmail.com>> wrote:
>> 
>> Sure. Thanks for the pointer, let me reorder the same. Any comments about 
>> the approach followed for merging topics and creating a single JSON?
>> 
>> Regards,
>> Vijay Raajaa G S
>> 
>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> Hi,
>> An AllWindow operator requires an AllWindowFunction, instead of a 
>> WindowFunction. In your case, the keyBy() seems to be in the wrong place, to 
>> get a keyed window you have to write something akin to:
>> 
>> inputStream
>>   .keyBy(…)
>>   .window(…)
>>   .apply(…) // or reduce()
>> 
>> In your case, you key the stream and then the keying is “lost” again because 
>> you apply a flatMap(). That’s why you have an all-window and not a keyed 
>> window.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com 
>>> <mailto:gsvijayraa...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> I am trying to combine two kafka topics using the a single kafka consumer 
>>> on a list of topics, further convert the json string in the stream to POJO. 
>>> Then, join them via keyBy ( On event time field ) and to merge them as a 
>>> single fat json, I was planning to use a window stream and apply a window 
>>> function on the window stream. The assumption is that Topic-A & Topic-B can 
>>> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B 
>>> (JSON ) will be present with the same eventTime. Hence was planning to use 
>>> a coutWindow(2) post keyBy on eventTime.
>>> 
>>> I have couple of questions for the same;
>>> 
>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>> 2. The window function on All Window stream doesnt seem to work fine; Any 
>>> pointers will be greatly appreciated.
>>> 
>>> Code Snippet : 
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> 
>>> logger.info <http://logger.info/>("Flink Stream Window Charger has 
>>> started");
>>> 
>>> Properties properties = new Properties();
>>> 
>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030 
>>> <http://127.0.0.1:1030/>");
>>> 
>>> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka 
>>> <http://127.0.0.1:2181/service-kafka>");
>>> 
>>> properties.setProperty("group.id <http://group.id/>", "group-0011");
>>> 
>>> properties.setProperty("auto.offset.reset", "smallest");
>>> 
>>> 
>>> 
>>> List < String > names = new ArrayList < > ();
>>> 
>>> 
>>> 
>>> names.add("Topic-A");
>>> 
>>> names.add("Topic-B");
>>> 
>>> 
>>> 
>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > 
>>> (names, new SimpleStringSchema(), properties));
>>> 
>>> DataStream < TopicPojo > pojo = stream.map(new 
>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>> 
>>> List < String > where = new ArrayList < String > ();
>>> 
>>> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new 
>>> Tokenizer()).countWindowAll(2);
>>> 
>>> DataStream < String > data_charging = data_window.apply(new 
>>> MyWindowFunction());
>>> 
>>> data_charging.addSink(new SinkFunction < String > () {
>>> 
>>> 
>>> 
>>> public void invoke(String value) throws Exception {
>>> 
>>> 
>>> 
>>>   // Yet to be implemented - Merge two POJO into one 
>>> 
>>>  }
>>> 
>>> });
>>> 
>>> 
>>> 
>>> try
>>> 
>>> {
>>> 
>>>  env.execute();
>>> 
>>> } catch (Exception e)
>>> 
>>> {
>>> 
>>>  return;
>>> 
>>> }
>>> 
>>> }
>>> 
>>> }
>>> 
>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>> 
>>>  private static final long serialVersionUID = 1 L;
>>> 
>>>  @Override
>>> 
>>>  public void flatMap(TopicPojo value, Collector < String > out) throws 
>>> Exception {
>>> 
>>>   ObjectMapper mapper = new ObjectMapper();
>>> 
>>>   out.collect(mapper.writeValueAsString(value));
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> class MyWindowFunction implements WindowFunction < TopicPojo, String, 
>>> String, GlobalWindow > {
>>> 
>>>  @Override
>>> 
>>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo > 
>>> arg2, Collector < String > out)
>>> 
>>>  throws Exception {
>>> 
>>>   int count = 0;
>>> 
>>>   for (TopicPojo in : arg2) {
>>> 
>>>    count++;
>>> 
>>>   }
>>> 
>>>   // Test Result - TO be modified
>>> 
>>>   out.collect("Window: " + window + "count: " + count);
>>> 
>>> 
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>> 
>>>  private static final long serialVersionUID = 1 L;
>>> 
>>>  @Override
>>> 
>>>  public TopicPojo map(String value) throws IOException {
>>> 
>>>   // TODO Auto-generated method stub
>>> 
>>>   ObjectMapper mapper = new ObjectMapper();
>>> 
>>>   TopicPojo obj = null;
>>> 
>>>   try {
>>> 
>>> 
>>> 
>>>    System.out.println(value);
>>> 
>>> 
>>> 
>>>    obj = mapper.readValue(value, TopicPojo.class);
>>> 
>>> 
>>> 
>>>   } catch (JsonParseException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>> 
>>> 
>>>   } catch (JsonMappingException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>>   } catch (IOException e) {
>>> 
>>> 
>>> 
>>>    // TODO Auto-generated catch block
>>> 
>>> 
>>> 
>>>    throw new IOException("Failed to deserialize JSON object.");
>>> 
>>>   }
>>> 
>>>   return obj;
>>> 
>>>  }
>>> 
>>> }
>>> 
>>> 
>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) 
>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable for 
>>> the arguments (MyWindowFunction) error.
>>> 
>>> Kindly give your input.
>>> 
>>> Regards,
>>> Vijay Raajaa GS 
>>> 
>> 
>> 
> 
> 
> 

Reply via email to