Sure. Thanks for the pointer, let me reorder the same. Any comments about the approach followed for merging topics and creating a single JSON?
Regards, Vijay Raajaa G S On Wed, May 3, 2017 at 2:41 PM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > An AllWindow operator requires an AllWindowFunction, instead of a > WindowFunction. In your case, the keyBy() seems to be in the wrong place, > to get a keyed window you have to write something akin to: > > inputStream > .keyBy(…) > .window(…) > .apply(…) // or reduce() > > In your case, you key the stream and then the keying is “lost” again > because you apply a flatMap(). That’s why you have an all-window and not a > keyed window. > > Best, > Aljoscha > > On 2. May 2017, at 09:20, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> > wrote: > > Hi, > > I am trying to combine two kafka topics using the a single kafka consumer > on a list of topics, further convert the json string in the stream to POJO. > Then, join them via keyBy ( On event time field ) and to merge them as a > single fat json, I was planning to use a window stream and apply a window > function on the window stream. The assumption is that Topic-A & Topic-B can > be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B > (JSON ) will be present with the same eventTime. Hence was planning to use > a coutWindow(2) post keyBy on eventTime. > > I have couple of questions for the same; > > 1. Is the approach fine for merging topics and creating a single JSON? > 2. The window function on All Window stream doesnt seem to work fine; Any > pointers will be greatly appreciated. > > Code Snippet : > > StreamExecutionEnvironment env = StreamExecutionEnvironment. > getExecutionEnvironment(); > > logger.info("Flink Stream Window Charger has started"); > > Properties properties = new Properties(); > > properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); > > properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka" > ); > > properties.setProperty("group.id", "group-0011"); > > properties.setProperty("auto.offset.reset", "smallest"); > > > List < String > names = new ArrayList < > (); > > > names.add("Topic-A"); > > names.add("Topic-B"); > > > DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > > (names, new SimpleStringSchema(), properties)); > > DataStream < TopicPojo > pojo = stream.map(new > Deserializer()).keyBy((eventTime) > -> TopicPojo.getEventTime()); > > List < String > where = new ArrayList < String > (); > > AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new > Tokenizer()).countWindowAll(2); > > DataStream < String > data_charging = data_window.apply(new > MyWindowFunction()); > > data_charging.addSink(new SinkFunction < String > () { > > > public void invoke(String value) throws Exception { > > > // Yet to be implemented - Merge two POJO into one > > } > > }); > > > try > > { > > env.execute(); > > } catch (Exception e) > > { > > return; > > } > > } > > } > > class Tokenizer implements FlatMapFunction < TopicPojo, String > { > > private static final long serialVersionUID = 1 L; > > @Override > > public void flatMap(TopicPojo value, Collector < String > out) throws > Exception { > > ObjectMapper mapper = new ObjectMapper(); > > out.collect(mapper.writeValueAsString(value)); > > } > > } > > class MyWindowFunction implements WindowFunction < TopicPojo, String, > String, GlobalWindow > { > > @Override > > public void apply(String key, GlobalWindow window, Iterable < TopicPojo > > arg2, Collector < String > out) > > throws Exception { > > int count = 0; > > for (TopicPojo in : arg2) { > > count++; > > } > > // Test Result - TO be modified > > out.collect("Window: " + window + "count: " + count); > > > } > > } > > class Deserializer implements MapFunction < String, TopicPojo > { > > private static final long serialVersionUID = 1 L; > > @Override > > public TopicPojo map(String value) throws IOException { > > // TODO Auto-generated method stub > > ObjectMapper mapper = new ObjectMapper(); > > TopicPojo obj = null; > > try { > > > System.out.println(value); > > > obj = mapper.readValue(value, TopicPojo.class); > > > } catch (JsonParseException e) { > > > // TODO Auto-generated catch block > > > throw new IOException("Failed to deserialize JSON object."); > > > } catch (JsonMappingException e) { > > > // TODO Auto-generated catch block > > > throw new IOException("Failed to deserialize JSON object."); > > } catch (IOException e) { > > > // TODO Auto-generated catch block > > > throw new IOException("Failed to deserialize JSON object."); > > } > > return obj; > > } > > } > > I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) > in the type AllWindowedStream<String,GlobalWindow> is not applicable for > the arguments (MyWindowFunction) error. > > Kindly give your input. > > Regards, > Vijay Raajaa GS > > >