Hi, I am trying to combine two kafka topics using the a single kafka consumer on a list of topics, further convert the json string in the stream to POJO. Then, join them via keyBy ( On event time field ) and to merge them as a single fat json, I was planning to use a window stream and apply a window function on the window stream. The assumption is that Topic-A & Topic-B can be joined on Event Time and only one pair ( Topic A ( JSON ) , Topic B (JSON ) will be present with the same eventTime. Hence was planning to use a coutWindow(2) post keyBy on eventTime.
I have couple of questions for the same; 1. Is the approach fine for merging topics and creating a single JSON? 2. The window function on All Window stream doesnt seem to work fine; Any pointers will be greatly appreciated. Code Snippet : StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); logger.info("Flink Stream Window Charger has started"); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "127.0.0.1:1030"); properties.setProperty("zookeeper.connect", "127.0.0.1:2181/service-kafka"); properties.setProperty("group.id", "group-0011"); properties.setProperty("auto.offset.reset", "smallest"); List < String > names = new ArrayList < > (); names.add("Topic-A"); names.add("Topic-B"); DataStream < String > stream = env.addSource(new FlinkKafkaConsumer08 < > (names, new SimpleStringSchema(), properties)); DataStream < TopicPojo > pojo = stream.map(new Deserializer()).keyBy((eventTime) -> TopicPojo.getEventTime()); List < String > where = new ArrayList < String > (); AllWindowedStream < String, GlobalWindow > data_window = pojo.flatMap(new Tokenizer()).countWindowAll(2); DataStream < String > data_charging = data_window.apply(new MyWindowFunction()); data_charging.addSink(new SinkFunction < String > () { public void invoke(String value) throws Exception { // Yet to be implemented - Merge two POJO into one } }); try { env.execute(); } catch (Exception e) { return; } } } class Tokenizer implements FlatMapFunction < TopicPojo, String > { private static final long serialVersionUID = 1 L; @Override public void flatMap(TopicPojo value, Collector < String > out) throws Exception { ObjectMapper mapper = new ObjectMapper(); out.collect(mapper.writeValueAsString(value)); } } class MyWindowFunction implements WindowFunction < TopicPojo, String, String, GlobalWindow > { @Override public void apply(String key, GlobalWindow window, Iterable < TopicPojo > arg2, Collector < String > out) throws Exception { int count = 0; for (TopicPojo in : arg2) { count++; } // Test Result - TO be modified out.collect("Window: " + window + "count: " + count); } } class Deserializer implements MapFunction < String, TopicPojo > { private static final long serialVersionUID = 1 L; @Override public TopicPojo map(String value) throws IOException { // TODO Auto-generated method stub ObjectMapper mapper = new ObjectMapper(); TopicPojo obj = null; try { System.out.println(value); obj = mapper.readValue(value, TopicPojo.class); } catch (JsonParseException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } catch (JsonMappingException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } catch (IOException e) { // TODO Auto-generated catch block throw new IOException("Failed to deserialize JSON object."); } return obj; } } I am getting - The method apply(AllWindowFunction<String,R,GlobalWindow>) in the type AllWindowedStream<String,GlobalWindow> is not applicable for the arguments (MyWindowFunction) error. Kindly give your input. Regards, Vijay Raajaa GS