Thanks for your input, will try to incorporate them in my implementation. Regards, Vijay Raajaa G S
On Wed, May 3, 2017 at 3:28 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > The approach could work, but if it can happen that an event from stream A > is not matched by an event in stream B you will have lingering state that > never goes away. For such cases it might be better to write a custom > CoProcessFunction as sketched here: https://ci.apache.org/ > projects/flink/flink-docs-release-1.2/dev/stream/process_function.html. > > The idea is to keep events from each side in state and emit a result when > you get the event from the other side. You also set a cleanup timer in case > no other event arrives to make sure that state eventually goes away. > > Best, > Aljoscha > > On 3. May 2017, at 11:47, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> > wrote: > > Sure. Thanks for the pointer, let me reorder the same. Any comments about > the approach followed for merging topics and creating a single JSON? > > Regards, > Vijay Raajaa G S > > On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi, >> An AllWindow operator requires an AllWindowFunction, instead of a >> WindowFunction. In your case, the keyBy() seems to be in the wrong place, >> to get a keyed window you have to write something akin to: >> >> inputStream >> .keyBy(…) >> .window(…) >> .apply(…) // or reduce() >> >> In your case, you key the stream and then the keying is “lost” again >> because you apply a flatMap(). That’s why you have an all-window and not a >> keyed window. >> >> Best, >> Aljoscha >> >> On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> >> wrote: >> >> Hi, >> >> I am trying to combine two kafka topics using the a single kafka consumer >> on a list of topics, further convert the json string in the stream to POJO. >> Then, join them via keyBy ( On event time field ) and to merge them as a >> single fat json, I was planning to use a window stream and apply a window >> function on the window stream. The assumption is that Topic-A & Topic-B can >> be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B >> (JSON ) will be present with the same eventTime. Hence was planning to use >> a coutWindow(2) post keyBy on eventTime. >> >> I have couple of questions for the same; >> >> 1. Is the approach fine for merging topics and creating a single JSON? >> 2. The window function on All Window stream doesnt seem to work fine; Any >> pointers will be greatly appreciated. >> >> Code Snippet : >> >> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> >> logger.info("Flink Stream Window Charger has started"); >> >> Properties properties = new Properties(); >> >> properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); >> >> properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka >> "); >> >> properties.setProperty("group.id", "group-0011"); >> >> properties.setProperty("auto.offset.reset", "smallest"); >> >> >> List < String > names = new ArrayList < > (); >> >> >> names.add("Topic-A"); >> >> names.add("Topic-B"); >> >> >> DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < >> > (names, new SimpleStringSchema(), properties)); >> >> DataStream < TopicPojo > pojo = stream.map(new >> Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); >> >> List < String > where = new ArrayList < String > (); >> >> AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new >> Tokenizer()).countWindowAll(2); >> >> DataStream < String > data_charging = data_window.apply(new >> MyWindowFunction()); >> >> data_charging.addSink(new SinkFunction < String > () { >> >> >> public void invoke(String value) throws Exception { >> >> >> // Yet to be implemented - Merge two POJO into one >> >> } >> >> }); >> >> >> try >> >> { >> >> env.execute(); >> >> } catch (Exception e) >> >> { >> >> return; >> >> } >> >> } >> >> } >> >> class Tokenizer implements FlatMapFunction < TopicPojo, String > { >> >> private static final long serialVersionUID = 1 L; >> >> @Override >> >> public void flatMap(TopicPojo value, Collector < String > out) throws >> Exception { >> >> ObjectMapper mapper = new ObjectMapper(); >> >> out.collect(mapper.writeValueAsString(value)); >> >> } >> >> } >> >> class MyWindowFunction implements WindowFunction < TopicPojo, String, >> String, GlobalWindow > { >> >> @Override >> >> public void apply(String key, GlobalWindow window, Iterable < TopicPojo >> > arg2, Collector < String > out) >> >> throws Exception { >> >> int count = 0; >> >> for (TopicPojo in : arg2) { >> >> count++; >> >> } >> >> // Test Result - TO be modified >> >> out.collect("Window: " + window + "count: " + count); >> >> >> } >> >> } >> >> class Deserializer implements MapFunction < String, TopicPojo > { >> >> private static final long serialVersionUID = 1 L; >> >> @Override >> >> public TopicPojo map(String value) throws IOException { >> >> // TODO Auto-generated method stub >> >> ObjectMapper mapper = new ObjectMapper(); >> >> TopicPojo obj = null; >> >> try { >> >> >> System.out.println(value); >> >> >> obj = mapper.readValue(value, TopicPojo.class); >> >> >> } catch (JsonParseException e) { >> >> >> // TODO Auto-generated catch block >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> >> } catch (JsonMappingException e) { >> >> >> // TODO Auto-generated catch block >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> } catch (IOException e) { >> >> >> // TODO Auto-generated catch block >> >> >> throw new IOException("Failed to deserialize JSON object."); >> >> } >> >> return obj; >> >> } >> >> } >> >> I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) >> in the type AllWindowedStream<String,GlobalWindow> is not applicable for >> the arguments (MyWindowFunction) error. >> >> Kindly give your input. >> >> Regards, >> Vijay Raajaa GS >> >> >> > >