bq. new SinkFunction<JsonObject>(){ Note the case in JsonObject. It should be JSONObject
FYI On Thu, Jun 8, 2017 at 1:27 PM, G.S.Vijay Raajaa <gsvijayraa...@gmail.com> wrote: > Hi, > > I am trying to pass the SingleOutputStreamOperator to a custom sink. I am > getting an error while implementing the same. > > Code snippet: > > SingleOutputStreamOperator<JSONObject> stream = env.addSource(source) > > .flatMap(new ExtractHashTagsSymbols(tickers)) > > .keyBy(0) > > .timeWindow(Time.seconds(10)) > > .sum(1) > > .timeWindowAll(Time.seconds(10)) > > .apply(new GetVolume(tickerVolumeMap)); > > stream.addSink(new SinkFunction<JsonObject>(){ > > > > public void invoke(JsonObject value) throws Exception { > > pushToSocket(value, socket_url); > > } > > }); > > > I am getting the following error: The method addSink(SinkFunction<JSONObject>) > in the type DataStream<JSONObject> is not applicable for the arguments (new > SinkFunction<JsonObject>(){}) > > Looking forward to your view. > > Regards, > > Vijay Raajaa GS >