I tried the timestamp field as a string datatype as well as a Date object. Getting same error in both the cases;
Please find the POJO file: import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonFormat; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import org.apache.commons.lang.builder.ToStringBuilder; @JsonPropertyOrder({ "data", "label", "eventTime" }) public class TopicPojo { @JsonProperty("data") private List<List<Double>> data = null; @JsonProperty("label") private List<String> label = null; @JsonProperty("eventTime") private static Date eventTime; /** * No args constructor for use in serialization * */ public TopicPojo() { } /** * * @param data * @param label * @param eventTime */ public SammonsPojo(List<List<Double>> data, List<String> label, Date eventTime) { super(); this.data = data; this.label = label; this.eventTime = eventTime; } @JsonProperty("data") public List<List<Double>> getData() { return data; } @JsonProperty("data") public void setData(List<List<Double>> data) { this.data = data; } @JsonProperty("label") public List<String> getLabel() { return label; } @JsonProperty("label") public void setLabel(List<String> label) { this.label = label; } @JsonProperty("eventTime") public static Date getEventTime() { return eventTime; } @JsonProperty("eventTime") public void setEventTime(Date eventTime) { this.eventTime = eventTime; } @Override public String toString() { return ToStringBuilder.reflectionToString(this); } } The above code pertains to eventTime as Date object , tried them as String as well. Regards, Vijay Raajaa G S On Fri, May 5, 2017 at 1:59 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > What’s the KeySelector you’re using? To me, this indicates that the > timestamp field is somehow changing after the original keying or in transit. > > Best. > Aljoscha > > On 4. May 2017, at 22:01, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> > wrote: > > I tried to reorder and the window function works fine. but then after > processing few stream of data from Topic A and Topic B, the window function > seem to throw the below error. The keyby is on eventTime field. > > java.lang.RuntimeException: Unexpected key group index. This indicates a > bug. > > at org.apache.flink.runtime.state.heap.StateTable.set(StateTable.java:57) > > at org.apache.flink.runtime.state.heap.HeapListState.add(H > eapListState.java:98) > > at org.apache.flink.streaming.runtime.operators.windowing. > WindowOperator.processElement(WindowOperator.java:372) > > at org.apache.flink.streaming.runtime.io.StreamInputProcessor. > processInput(StreamInputProcessor.java:185) > > at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInpu > tStreamTask.java:63) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:272) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) > > at java.lang.Thread.run(Thread.java:745) > > > Regards, > > Vijay Raajaa GS > > On Wed, May 3, 2017 at 3:50 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> > wrote: > >> Thanks for your input, will try to incorporate them in my implementation. >> >> Regards, >> Vijay Raajaa G S >> >> On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> The approach could work, but if it can happen that an event from stream >>> A is not matched by an event in stream B you will have lingering state that >>> never goes away. For such cases it might be better to write a custom >>> CoProcessFunction as sketched here: https://ci.apache.org/pr >>> ojects/flink/flink-docs-release-1.2/dev/stream/process_function.html. >>> >>> The idea is to keep events from each side in state and emit a result >>> when you get the event from the other side. You also set a cleanup timer in >>> case no other event arrives to make sure that state eventually goes away. >>> >>> Best, >>> Aljoscha >>> >>> On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> >>> wrote: >>> >>> Sure. Thanks for the pointer, let me reorder the same. Any comments >>> about the approach followed for merging topics and creating a single >>> JSON? >>> >>> Regards, >>> Vijay Raajaa G S >>> >>> On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org> >>> wrote: >>> >>>> Hi, >>>> An AllWindow operator requires an AllWindowFunction, instead of a >>>> WindowFunction. In your case, the keyBy() seems to be in the wrong place, >>>> to get a keyed window you have to write something akin to: >>>> >>>> inputStream >>>> .keyBy(…) >>>> .window(…) >>>> .apply(…) // or reduce() >>>> >>>> In your case, you key the stream and then the keying is “lost” again >>>> because you apply a flatMap(). That’s why you have an all-window and not a >>>> keyed window. >>>> >>>> Best, >>>> Aljoscha >>>> >>>> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> >>>> wrote: >>>> >>>> Hi, >>>> >>>> I am trying to combine two kafka topics using the a single kafka >>>> consumer on a list of topics, further convert the json string in the stream >>>> to POJO. Then, join them via keyBy ( On event time field ) and to merge >>>> them as a single fat json, I was planning to use a window stream and apply >>>> a window function on the window stream. The assumption is that Topic-A & >>>> Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , >>>> Topic B (JSON ) will be present with the same eventTime. Hence was planning >>>> to use a coutWindow(2) post keyBy on eventTime. >>>> >>>> I have couple of questions for the same; >>>> >>>> 1. Is the approach fine for merging topics and creating a single JSON? >>>> 2. The window function on All Window stream doesnt seem to work fine; >>>> Any pointers will be greatly appreciated. >>>> >>>> Code Snippet : >>>> >>>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >>>> ExecutionEnvironment(); >>>> >>>> logger.info("Flink Stream Window Charger has started"); >>>> >>>> Properties properties = new Properties(); >>>> >>>> properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); >>>> >>>> properties.setProperty("zookeeper.connect", " >>>> 127.0.0.1:2181/service-kafka"); >>>> >>>> properties.setProperty("group.id", "group-0011"); >>>> >>>> properties.setProperty("auto.offset.reset", "smallest"); >>>> >>>> >>>> List < String > names = new ArrayList < > (); >>>> >>>> >>>> names.add("Topic-A"); >>>> >>>> names.add("Topic-B"); >>>> >>>> >>>> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 >>>> < > (names, new SimpleStringSchema(), properties)); >>>> >>>> DataStream < TopicPojo > pojo = stream.map(new >>>> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >>>> >>>> List < String > where = new ArrayList < String > (); >>>> >>>> AllWindowedStream < String, GlobalWindow > data_window = >>>> pojo.flatMap(new Tokenizer()).countWindowAll(2); >>>> >>>> DataStream < String > data_charging = data_window.apply(new >>>> MyWindowFunction()); >>>> >>>> data_charging.addSink(new SinkFunction < String > () { >>>> >>>> >>>> public void invoke(String value) throws Exception { >>>> >>>> >>>> // Yet to be implemented - Merge two POJO into one >>>> >>>> } >>>> >>>> }); >>>> >>>> >>>> try >>>> >>>> { >>>> >>>> env.execute(); >>>> >>>> } catch (Exception e) >>>> >>>> { >>>> >>>> return; >>>> >>>> } >>>> >>>> } >>>> >>>> } >>>> >>>> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >>>> >>>> private static final long serialVersionUID = 1 L; >>>> >>>> @Override >>>> >>>> public void flatMap(TopicPojo value, Collector < String > out) throws >>>> Exception { >>>> >>>> ObjectMapper mapper = new ObjectMapper(); >>>> >>>> out.collect(mapper.writeValueAsString(value)); >>>> >>>> } >>>> >>>> } >>>> >>>> class MyWindowFunction implements WindowFunction < TopicPojo, String, >>>> String, GlobalWindow > { >>>> >>>> @Override >>>> >>>> public void apply(String key, GlobalWindow window, Iterable < >>>> TopicPojo > arg2, Collector < String > out) >>>> >>>> throws Exception { >>>> >>>> int count = 0; >>>> >>>> for (TopicPojo in : arg2) { >>>> >>>> count++; >>>> >>>> } >>>> >>>> // Test Result - TO be modified >>>> >>>> out.collect("Window: " + window + "count: " + count); >>>> >>>> >>>> } >>>> >>>> } >>>> >>>> class Deserializer implements MapFunction < String, TopicPojo > { >>>> >>>> private static final long serialVersionUID = 1 L; >>>> >>>> @Override >>>> >>>> public TopicPojo map(String value) throws IOException { >>>> >>>> // TODO Auto-generated method stub >>>> >>>> ObjectMapper mapper = new ObjectMapper(); >>>> >>>> TopicPojo obj = null; >>>> >>>> try { >>>> >>>> >>>> System.out.println(value); >>>> >>>> >>>> obj = mapper.readValue(value, TopicPojo.class); >>>> >>>> >>>> } catch (JsonParseException e) { >>>> >>>> >>>> // TODO Auto-generated catch block >>>> >>>> >>>> throw new IOException("Failed to deserialize JSON object."); >>>> >>>> >>>> } catch (JsonMappingException e) { >>>> >>>> >>>> // TODO Auto-generated catch block >>>> >>>> >>>> throw new IOException("Failed to deserialize JSON object."); >>>> >>>> } catch (IOException e) { >>>> >>>> >>>> // TODO Auto-generated catch block >>>> >>>> >>>> throw new IOException("Failed to deserialize JSON object."); >>>> >>>> } >>>> >>>> return obj; >>>> >>>> } >>>> >>>> } >>>> >>>> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) >>>> in the type AllWindowedStream<String,GlobalWindow> is not applicable >>>> for the arguments (MyWindowFunction) error. >>>> >>>> Kindly give your input. >>>> >>>> Regards, >>>> Vijay Raajaa GS >>>> >>>> >>>> >>> >>> >> > >