Hi,

I am trying to pass the SingleOutputStreamOperator to a custom sink. I am
getting an error while implementing the same.

Code snippet:

SingleOutputStreamOperator<JSONObject> stream = env.addSource(source)

            .flatMap(new ExtractHashTagsSymbols(tickers))

            .keyBy(0)

            .timeWindow(Time.seconds(10))

            .sum(1)

            .timeWindowAll(Time.seconds(10))

            .apply(new GetVolume(tickerVolumeMap));

            stream.addSink(new SinkFunction<JsonObject>(){



    public void invoke(JsonObject value) throws Exception {

     pushToSocket(value, socket_url);

    }

    });


I am getting the following error: The method
addSink(SinkFunction<JSONObject>) in the type DataStream<JSONObject> is not
applicable for the arguments (new SinkFunction<JsonObject>(){})

Looking forward to your view.

Regards,

Vijay Raajaa GS

Reply via email to