I tried to reorder and the window function works fine. but then after
processing few stream of data from Topic A and Topic B, the window function
seem to throw the below error. The keyby is on eventTime field.

java.lang.RuntimeException: Unexpected key group index. This indicates a
bug.

at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57)

at org.apache.flink.runtime.state.heap.HeapListState.add(
HeapListState.java:98)

at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(
WindowOperator.java:372)

at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(
StreamInputProcessor.java:185)

at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(
OneInputStreamTask.java:63)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
StreamTask.java:272)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)

at java.lang.Thread.run(Thread.java:745)


Regards,

Vijay Raajaa GS

On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
wrote:

> Thanks for your input, will try to incorporate them in my implementation.
>
> Regards,
> Vijay Raajaa G S
>
> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> The approach could work, but if it can happen that an event from stream A
>> is not matched by an event in stream B you will have lingering state that
>> never goes away. For such cases it might be better to write a custom
>> CoProcessFunction as sketched here: https://ci.apache.org/pr
>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html.
>>
>> The idea is to keep events from each side in state and emit a result when
>> you get the event from the other side. You also set a cleanup timer in case
>> no other event arrives to make sure that state eventually goes away.
>>
>> Best,
>> Aljoscha
>>
>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
>> wrote:
>>
>> Sure. Thanks for the pointer, let me reorder the same. Any comments about
>> the approach followed for merging topics and creating a single JSON?
>>
>> Regards,
>> Vijay Raajaa G S
>>
>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org>
>> wrote:
>>
>>> Hi,
>>> An AllWindow operator requires an AllWindowFunction, instead of a
>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place,
>>> to get a keyed window you have to write something akin to:
>>>
>>> inputStream
>>>   .keyBy(…)
>>>   .window(…)
>>>   .apply(…) // or reduce()
>>>
>>> In your case, you key the stream and then the keying is “lost” again
>>> because you apply a flatMap(). That’s why you have an all-window and not a
>>> keyed window.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com>
>>> wrote:
>>>
>>> Hi,
>>>
>>> I am trying to combine two kafka topics using the a single kafka
>>> consumer on a list of topics, further convert the json string in the stream
>>> to POJO. Then, join them via keyBy ( On event time field ) and to merge
>>> them as a single fat json, I was planning to use a window stream and apply
>>> a window function on the window stream. The assumption is that Topic-A &
>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) ,
>>> Topic B (JSON ) will be present with the same eventTime. Hence was planning
>>> to use a coutWindow(2) post keyBy on eventTime.
>>>
>>> I have couple of questions for the same;
>>>
>>> 1. Is the approach fine for merging topics and creating a single JSON?
>>> 2. The window function on All Window stream doesnt seem to work fine;
>>> Any pointers will be greatly appreciated.
>>>
>>> Code Snippet :
>>>
>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>>> ExecutionEnvironment();
>>>
>>> logger.info("Flink Stream Window Charger has started");
>>>
>>> Properties properties = new Properties();
>>>
>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030");
>>>
>>> properties.setProperty("zookeeper.connect", "
>>> 127.0.0.1:2181/service-kafka");
>>>
>>> properties.setProperty("group.id", "group-0011");
>>>
>>> properties.setProperty("auto.offset.reset", "smallest");
>>>
>>>
>>> List < String > names = new ArrayList < > ();
>>>
>>>
>>> names.add("Topic-A");
>>>
>>> names.add("Topic-B");
>>>
>>>
>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 <
>>> > (names, new SimpleStringSchema(), properties));
>>>
>>> DataStream < TopicPojo > pojo = stream.map(new
>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime());
>>>
>>> List < String > where = new ArrayList < String > ();
>>>
>>> AllWindowedStream < String, GlobalWindow > data_window =
>>> pojo.flatMap(new Tokenizer()).countWindowAll(2);
>>>
>>> DataStream < String > data_charging = data_window.apply(new
>>> MyWindowFunction());
>>>
>>> data_charging.addSink(new SinkFunction < String > () {
>>>
>>>
>>> public void invoke(String value) throws Exception {
>>>
>>>
>>>   // Yet to be implemented - Merge two POJO into one
>>>
>>>  }
>>>
>>> });
>>>
>>>
>>> try
>>>
>>> {
>>>
>>>  env.execute();
>>>
>>> } catch (Exception e)
>>>
>>> {
>>>
>>>  return;
>>>
>>> }
>>>
>>> }
>>>
>>> }
>>>
>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > {
>>>
>>>  private static final long serialVersionUID = 1 L;
>>>
>>>  @Override
>>>
>>>  public void flatMap(TopicPojo value, Collector < String > out) throws
>>> Exception {
>>>
>>>   ObjectMapper mapper = new ObjectMapper();
>>>
>>>   out.collect(mapper.writeValueAsString(value));
>>>
>>>  }
>>>
>>> }
>>>
>>> class MyWindowFunction implements WindowFunction < TopicPojo, String,
>>> String, GlobalWindow > {
>>>
>>>  @Override
>>>
>>>  public void apply(String key, GlobalWindow window, Iterable < TopicPojo
>>> > arg2, Collector < String > out)
>>>
>>>  throws Exception {
>>>
>>>   int count = 0;
>>>
>>>   for (TopicPojo in : arg2) {
>>>
>>>    count++;
>>>
>>>   }
>>>
>>>   // Test Result - TO be modified
>>>
>>>   out.collect("Window: " + window + "count: " + count);
>>>
>>>
>>>  }
>>>
>>> }
>>>
>>> class Deserializer implements MapFunction < String, TopicPojo > {
>>>
>>>  private static final long serialVersionUID = 1 L;
>>>
>>>  @Override
>>>
>>>  public TopicPojo map(String value) throws IOException {
>>>
>>>   // TODO Auto-generated method stub
>>>
>>>   ObjectMapper mapper = new ObjectMapper();
>>>
>>>   TopicPojo obj = null;
>>>
>>>   try {
>>>
>>>
>>>    System.out.println(value);
>>>
>>>
>>>    obj = mapper.readValue(value, TopicPojo.class);
>>>
>>>
>>>   } catch (JsonParseException e) {
>>>
>>>
>>>    // TODO Auto-generated catch block
>>>
>>>
>>>    throw new IOException("Failed to deserialize JSON object.");
>>>
>>>
>>>   } catch (JsonMappingException e) {
>>>
>>>
>>>    // TODO Auto-generated catch block
>>>
>>>
>>>    throw new IOException("Failed to deserialize JSON object.");
>>>
>>>   } catch (IOException e) {
>>>
>>>
>>>    // TODO Auto-generated catch block
>>>
>>>
>>>    throw new IOException("Failed to deserialize JSON object.");
>>>
>>>   }
>>>
>>>   return obj;
>>>
>>>  }
>>>
>>> }
>>>
>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>)
>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable
>>> for the arguments (MyWindowFunction) error.
>>>
>>> Kindly give your input.
>>>
>>> Regards,
>>> Vijay Raajaa GS
>>>
>>>
>>>
>>
>>
>

Reply via email to