Hi, according to the documents I tried to get late data using side output.

final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new
OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};

DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
                .assignTimestampsAndWatermarks(new Bound())
        }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
                .allowedLateness(Time.milliseconds(2))
                .sideOutputLateData(lateOutputTag)
                .reduce(Do some process);


When trying to store late data in a Datastream (As shown in document):

DataStream<Tuple3<String, Long, JSONObject>> lateData = res.

there is no predefined getSideOutput method on DataStream res!
But if I call getSideOutput just after reduce function, it is known! But I
don't want to save late data on res variable and I want to save them on
another variable!

DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
                .assignTimestampsAndWatermarks(new Bound())
        }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
                .allowedLateness(Time.milliseconds(2))
                .sideOutputLateData(lateOutputTag)
                .reduce(Do some process)

                 .getSideoutput(lateOutputTag);

What is the problem here?

Reply via email to