Hi, I am trying to implement a flink job which takes the twitter as the source and collects tweets from a list of hashtags. The flink job basically aggregates the volume of tweets per hashtag in a given time frame. I have implemented them successfully, but then if there is no tweet across all the hashtags I need to send out a default value of 0 across all hashtags. Not sure how to implement this functionality.
Code Snippet : env.addSource(source) .flatMap(new ExtractHashTagsSymbols(tickers)) .keyBy(0) .timeWindow(Time.seconds(Integer.parseInt(window_time))) .sum(1) .timeWindowAll(Time.seconds(Integer.parseInt(window_time))) .apply(new GetVolume(tickerVolumeMap)) .addSink(new SinkFunction<JSONObject>(){ public void invoke(JSONObject value) throws Exception { System.out.println("Twitter Volume:"+value.toString()); //JsonParser jsonParser = new JsonParser(); //JsonObject gsonObject = (JsonObject)jsonParser.parse(value.toString()); pushToSocket(value, socket_url); } }); The above code waits for window_time time frame and computes the tweet volume and sends out a json. Regards, Vijay Raajaa GS