Hi Soheil,

The `getSideOutput()` method is defined on the operator instead of the 
datastream.
You can invoke it after any action (e.g., map, window) performed on a 
datastream.

Best,
Xingcan

> On Jul 17, 2018, at 3:36 PM, Soheil Pourbafrani <soheil.i...@gmail.com> wrote:
> 
> Hi, according to the documents I tried to get late data using side output.
> 
> final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new 
> OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
> DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
>                 .assignTimestampsAndWatermarks(new Bound())
>         }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
>                 .allowedLateness(Time.milliseconds(2))
>                 .sideOutputLateData(lateOutputTag)
>                 .reduce(Do some process);
> 
> When trying to store late data in a Datastream (As shown in document):
> DataStream<Tuple3<String, Long, JSONObject>> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I call getSideOutput just after reduce function, it is known! But I 
> don't want to save late data on res variable and I want to save them on 
> another variable!
> DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
>                 .assignTimestampsAndWatermarks(new Bound())
>         }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/
>                 .allowedLateness(Time.milliseconds(2))
>                 .sideOutputLateData(lateOutputTag)
>                 .reduce(Do some process)
>                  .getSideoutput(lateOutputTag);
> What is the problem here?
> 
> 

Reply via email to