Hi Fabian,

I do not have anything to do "yourfunction".
.apply windowFunction is legacy is what the documentation says. But I am at
a loss to understand which of the reduce, aggregate, Fold, Apply must i
use, as i hardly have any operations to perform but to return the stream
with no late data back to me [So that I will construct a Flink table with
this data and do my processing there].

~Ramya.



On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Ramya,
>
> This works by calling getSideOutput() on the main output of the window
> function.
> The main output is collected by applying a function on the window.
>
> DataStream<Input> input = ...
> OutputTag<Input> lateTag = ...
>
> DataStream<Result> mainResult = input
>   .keyBy(...)
>   .window(...)
>   .sideOutputLateData(lateTag)
>   .apply(yourFunction);
>
> DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);
>
> Best, Fabian
>
> Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
> hair...@gmail.com>:
>
> > Hi,
> >
> > We were trying to collect the sideOutput.
> > But failed to understand as to how to convert this windowed stream to a
> > datastream.
> >
> > final OutputTag<Tuple6<String, String, String, String, String,
> Timestamp>>
> > lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> > String, Timestamp>>("late-data"){};
> > withTime.keyBy(0, 2)
> > .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> > .allowedLateness(Time.minutes(1))
> > .sideOutputLateData(lateOutputTag);
> >
> > I would now have a windowed stream with records coming in late, tagged as
> > lateOutputTag. How to convert the packets which are not late , back to a
> > datastream. Do we need to use the .apply function to collect this data
> ...
> > quite unsure of this. Appreciate your help.
> >
> > Best Regards,
> >
> >
> >
> > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com>
> wrote:
> >
> > > Hi Ramya,
> > >
> > > This would be a great feature, but unfortunately is not support (yet)
> by
> > > Flink SQL.
> > > Currently, all late records are dropped.
> > >
> > > A workaround is to ingest the stream as a DataStream, have a custom
> > > operator that routes all late records to a side output, and registering
> > the
> > > DataStream without late records as a table on which the SQL query is
> > > evaluated.
> > > This requires quite a bit of boilerplate code but could be hidden in a
> > util
> > > class.
> > >
> > > Best, Fabian
> > >
> > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > > hair...@gmail.com>:
> > >
> > > > Hi,
> > > >
> > > > I have a query with regard to Late arriving records.
> > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > > In my sink operators, which converts this table to a stream which is
> > > being
> > > > pushed to Elastic Search, I am able to see this metric "
> > > > *numLateRecordsDropped*".
> > > >
> > > > My Kafka consumers doesn't seem to have any lag and the events are
> > > > processed properly. To be able to take these events to a side outputs
> > > > doesn't seem to be possible with tables. Below is the snippet:
> > > >
> > > >         tableEnv.connect(new Kafka()
> > > >           /* setting of all kafka properties */
> > > >                .startFromLatest())
> > > >                .withSchema(new Schema()
> > > >                        .field("sid", Types.STRING())
> > > >                        .field("_zpsbd6", Types.STRING())
> > > >                        .field("r1", Types.STRING())
> > > >                        .field("r2", Types.STRING())
> > > >                        .field("r5", Types.STRING())
> > > >                        .field("r10", Types.STRING())
> > > >                        .field("isBot", Types.BOOLEAN())
> > > >                        .field("botcode", Types.STRING())
> > > >                        .field("ts", Types.SQL_TIMESTAMP())
> > > >                        .rowtime(new Rowtime()
> > > >                                .timestampsFromField("recvdTime")
> > > >                                .watermarksPeriodicBounded(10000)
> > > >                        )
> > > >                )
> > > >                .withFormat(new Json().deriveSchema())
> > > >                .inAppendMode()
> > > >                .registerTableSource("sourceTopic");
> > > >
> > > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> > total_hits, "
> > > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> > tumbleStart, "
> > > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd
> FROM
> > > > sourceTopic "
> > > >                + "WHERE r1='true' or r2='true' or r5='true' or
> > r10='true'
> > > > and isBot='true' "
> > > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > > _zpsbd6";
> > > >
> > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric
> is
> > > > showing the lateRecordsDropped, while executing the group by
> operation.
> > > >
> > > > Is there  a way to get the sideOutput of this to be able to debug
> > better
> > > ??
> > > >
> > > > Thanks,
> > > > ~Ramya.
> > > >
> > >
> >
>

Reply via email to