Hi, I am trying to pass the SingleOutputStreamOperator to a custom sink. I am getting an error while implementing the same.
Code snippet: SingleOutputStreamOperator<JSONObject> stream = env.addSource(source) .flatMap(new ExtractHashTagsSymbols(tickers)) .keyBy(0) .timeWindow(Time.seconds(10)) .sum(1) .timeWindowAll(Time.seconds(10)) .apply(new GetVolume(tickerVolumeMap)); stream.addSink(new SinkFunction<JsonObject>(){ public void invoke(JsonObject value) throws Exception { pushToSocket(value, socket_url); } }); I am getting the following error: The method addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not applicable for the arguments (new SinkFunction<JsonObject>(){}) Looking forward to your view. Regards, Vijay Raajaa GS