Hi Ramya,

This works by calling getSideOutput() on the main output of the window
function.
The main output is collected by applying a function on the window.

DataStream<Input> input = ...
OutputTag<Input> lateTag = ...

DataStream<Result> mainResult = input
  .keyBy(...)
  .window(...)
  .sideOutputLateData(lateTag)
  .apply(yourFunction);

DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);

Best, Fabian

Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
hair...@gmail.com>:

> Hi,
>
> We were trying to collect the sideOutput.
> But failed to understand as to how to convert this windowed stream to a
> datastream.
>
> final OutputTag<Tuple6<String, String, String, String, String, Timestamp>>
> lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> String, Timestamp>>("late-data"){};
> withTime.keyBy(0, 2)
> .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> .allowedLateness(Time.minutes(1))
> .sideOutputLateData(lateOutputTag);
>
> I would now have a windowed stream with records coming in late, tagged as
> lateOutputTag. How to convert the packets which are not late , back to a
> datastream. Do we need to use the .apply function to collect this data ...
> quite unsure of this. Appreciate your help.
>
> Best Regards,
>
>
>
> On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Ramya,
> >
> > This would be a great feature, but unfortunately is not support (yet) by
> > Flink SQL.
> > Currently, all late records are dropped.
> >
> > A workaround is to ingest the stream as a DataStream, have a custom
> > operator that routes all late records to a side output, and registering
> the
> > DataStream without late records as a table on which the SQL query is
> > evaluated.
> > This requires quite a bit of boilerplate code but could be hidden in a
> util
> > class.
> >
> > Best, Fabian
> >
> > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > hair...@gmail.com>:
> >
> > > Hi,
> > >
> > > I have a query with regard to Late arriving records.
> > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > In my sink operators, which converts this table to a stream which is
> > being
> > > pushed to Elastic Search, I am able to see this metric "
> > > *numLateRecordsDropped*".
> > >
> > > My Kafka consumers doesn't seem to have any lag and the events are
> > > processed properly. To be able to take these events to a side outputs
> > > doesn't seem to be possible with tables. Below is the snippet:
> > >
> > >         tableEnv.connect(new Kafka()
> > >           /* setting of all kafka properties */
> > >                .startFromLatest())
> > >                .withSchema(new Schema()
> > >                        .field("sid", Types.STRING())
> > >                        .field("_zpsbd6", Types.STRING())
> > >                        .field("r1", Types.STRING())
> > >                        .field("r2", Types.STRING())
> > >                        .field("r5", Types.STRING())
> > >                        .field("r10", Types.STRING())
> > >                        .field("isBot", Types.BOOLEAN())
> > >                        .field("botcode", Types.STRING())
> > >                        .field("ts", Types.SQL_TIMESTAMP())
> > >                        .rowtime(new Rowtime()
> > >                                .timestampsFromField("recvdTime")
> > >                                .watermarksPeriodicBounded(10000)
> > >                        )
> > >                )
> > >                .withFormat(new Json().deriveSchema())
> > >                .inAppendMode()
> > >                .registerTableSource("sourceTopic");
> > >
> > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> total_hits, "
> > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> tumbleStart, "
> > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> > > sourceTopic "
> > >                + "WHERE r1='true' or r2='true' or r5='true' or
> r10='true'
> > > and isBot='true' "
> > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > _zpsbd6";
> > >
> > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> > > showing the lateRecordsDropped, while executing the group by operation.
> > >
> > > Is there  a way to get the sideOutput of this to be able to debug
> better
> > ??
> > >
> > > Thanks,
> > > ~Ramya.
> > >
> >
>

Reply via email to