Hi Soheil, The /getSideOutput/ method is part of /SingleOutputStreamOperator/ which extends /DataStream///. Try using /SingleOutputStreamOperator/ as the type for your res variable.
Best, Dawid On 17/07/18 09:36, Soheil Pourbafrani wrote: > Hi, according to theĀ documents I tried to get late data using side > output. > > final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new > OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){}; > DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process); > > When trying to store late data in a Datastream (As shown in document): > DataStream<Tuple3<String, Long, JSONObject>> lateData = res. > there is no predefined getSideOutput method on DataStream res! > But if I callĀ getSideOutput just after reduce function, it is known! > But I don't want to save late data on res variable and I want to save > them on another variable! > DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process) > .getSideoutput(lateOutputTag); > What is the problem here? > >
signature.asc
Description: OpenPGP digital signature