Hi Soheil, The `getSideOutput()` method is defined on the operator instead of the datastream. You can invoke it after any action (e.g., map, window) performed on a datastream.
Best, Xingcan > On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani <soheil.i...@gmail.com> wrote: > > Hi, according to the documents I tried to get late data using side output. > > final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new > OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){}; > DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process); > > When trying to store late data in a Datastream (As shown in document): > DataStream<Tuple3<String, Long, JSONObject>> lateData = res. > there is no predefined getSideOutput method on DataStream res! > But if I call getSideOutput just after reduce function, it is known! But I > don't want to save late data on res variable and I want to save them on > another variable! > DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple > .assignTimestampsAndWatermarks(new Bound()) > }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ > .allowedLateness(Time.milliseconds(2)) > .sideOutputLateData(lateOutputTag) > .reduce(Do some process) > .getSideoutput(lateOutputTag); > What is the problem here? > >