Hi Soheil,

The /getSideOutput/ method is part of /SingleOutputStreamOperator/ which
extends /DataStream///. Try using /SingleOutputStreamOperator/ as the
type for your res variable.

Best,

Dawid


On 17/07/18 09:36, Soheil Pourbafrani wrote:
> Hi, according to theĀ documents I tried to get late data using side
> output.
>
> final OutputTag<Tuple3<String, Long, JSONObject>> lateOutputTag = new 
> OutputTag<Tuple3<String, Long, JSONObject>>("late-data"){};
> DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
>                 .assignTimestampsAndWatermarks(new Bound())
>         }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
>                 .sideOutputLateData(lateOutputTag)
>                 .reduce(Do some process);
>
> When trying to store late data in a Datastream (As shown in document):
> DataStream<Tuple3<String, Long, JSONObject>> lateData = res.
> there is no predefined getSideOutput method on DataStream res!
> But if I callĀ getSideOutput just after reduce function, it is known!
> But I don't want to save late data on res variable and I want to save
> them on another variable!
> DataStream<Tuple3<String,Long, JSONObject>> res = aggregatedTuple
>                 .assignTimestampsAndWatermarks(new Bound())
>         }).keyBy(1).timeWindow(Time.milliseconds(160))/*.countWindow(3)*/ 
> .allowedLateness(Time.milliseconds(2))
>                 .sideOutputLateData(lateOutputTag)
>                 .reduce(Do some process)
>                  .getSideoutput(lateOutputTag);
> What is the problem here?
>
>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to