Hi, according to the documents I tried to get late data using side output. final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple .assignTimestampsAndWatermarks(new Bound()) }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ .allowedLateness(Time.milliseconds(2)) .sideOutputLateData(lateOutputTag) .reduce(Do some process); When trying to store late data in a Datastream (As shown in document): DataStream<Tuple3<String, Long, JSONObject>> lateData = res. there is no predefined getSideOutput method on DataStream res! But if I call getSideOutput just after reduce function, it is known! But I don't want to save late data on res variable and I want to save them on another variable! DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple .assignTimestampsAndWatermarks(new Bound()) }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ .allowedLateness(Time.milliseconds(2)) .sideOutputLateData(lateOutputTag) .reduce(Do some process) .getSideoutput(lateOutputTag); What is the problem here?