Hi Ramya,

If you don't want to apply any logic but just filter late records, you
should not use a window because it needs to shuffle and group records into
windows.
Instead, you can use a non-keyed ProcessFunction and compare the timestamp
of the record (context.timestamp()) with the current watermark
(context.getTimerService().currentWatermark()) and emit all records that
are late to a side output.
This will avoid the shuffle and reduce processing latency.

Best, Fabian

Am Di., 29. Jan. 2019 um 15:02 Uhr schrieb Ramya Ramamurthy <
hair...@gmail.com>:

> Hi Fabian,
>
> I do not have anything to do "yourfunction".
> .apply windowFunction is legacy is what the documentation says. But I am at
> a loss to understand which of the reduce, aggregate, Fold, Apply must i
> use, as i hardly have any operations to perform but to return the stream
> with no late data back to me [So that I will construct a Flink table with
> this data and do my processing there].
>
> ~Ramya.
>
>
>
> On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <fhue...@gmail.com> wrote:
>
> > Hi Ramya,
> >
> > This works by calling getSideOutput() on the main output of the window
> > function.
> > The main output is collected by applying a function on the window.
> >
> > DataStream<Input> input = ...
> > OutputTag<Input> lateTag = ...
> >
> > DataStream<Result> mainResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .sideOutputLateData(lateTag)
> >   .apply(yourFunction);
> >
> > DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);
> >
> > Best, Fabian
> >
> > Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
> > hair...@gmail.com>:
> >
> > > Hi,
> > >
> > > We were trying to collect the sideOutput.
> > > But failed to understand as to how to convert this windowed stream to a
> > > datastream.
> > >
> > > final OutputTag<Tuple6<String, String, String, String, String,
> > Timestamp>>
> > > lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> > > String, Timestamp>>("late-data"){};
> > > withTime.keyBy(0, 2)
> > > .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> > > .allowedLateness(Time.minutes(1))
> > > .sideOutputLateData(lateOutputTag);
> > >
> > > I would now have a windowed stream with records coming in late, tagged
> as
> > > lateOutputTag. How to convert the packets which are not late , back to
> a
> > > datastream. Do we need to use the .apply function to collect this data
> > ...
> > > quite unsure of this. Appreciate your help.
> > >
> > > Best Regards,
> > >
> > >
> > >
> > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com>
> > wrote:
> > >
> > > > Hi Ramya,
> > > >
> > > > This would be a great feature, but unfortunately is not support (yet)
> > by
> > > > Flink SQL.
> > > > Currently, all late records are dropped.
> > > >
> > > > A workaround is to ingest the stream as a DataStream, have a custom
> > > > operator that routes all late records to a side output, and
> registering
> > > the
> > > > DataStream without late records as a table on which the SQL query is
> > > > evaluated.
> > > > This requires quite a bit of boilerplate code but could be hidden in
> a
> > > util
> > > > class.
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > > > hair...@gmail.com>:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have a query with regard to Late arriving records.
> > > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > > > In my sink operators, which converts this table to a stream which
> is
> > > > being
> > > > > pushed to Elastic Search, I am able to see this metric "
> > > > > *numLateRecordsDropped*".
> > > > >
> > > > > My Kafka consumers doesn't seem to have any lag and the events are
> > > > > processed properly. To be able to take these events to a side
> outputs
> > > > > doesn't seem to be possible with tables. Below is the snippet:
> > > > >
> > > > >         tableEnv.connect(new Kafka()
> > > > >           /* setting of all kafka properties */
> > > > >                .startFromLatest())
> > > > >                .withSchema(new Schema()
> > > > >                        .field("sid", Types.STRING())
> > > > >                        .field("_zpsbd6", Types.STRING())
> > > > >                        .field("r1", Types.STRING())
> > > > >                        .field("r2", Types.STRING())
> > > > >                        .field("r5", Types.STRING())
> > > > >                        .field("r10", Types.STRING())
> > > > >                        .field("isBot", Types.BOOLEAN())
> > > > >                        .field("botcode", Types.STRING())
> > > > >                        .field("ts", Types.SQL_TIMESTAMP())
> > > > >                        .rowtime(new Rowtime()
> > > > >                                .timestampsFromField("recvdTime")
> > > > >                                .watermarksPeriodicBounded(10000)
> > > > >                        )
> > > > >                )
> > > > >                .withFormat(new Json().deriveSchema())
> > > > >                .inAppendMode()
> > > > >                .registerTableSource("sourceTopic");
> > > > >
> > > > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> > > total_hits, "
> > > > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> > > tumbleStart, "
> > > > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd
> > FROM
> > > > > sourceTopic "
> > > > >                + "WHERE r1='true' or r2='true' or r5='true' or
> > > r10='true'
> > > > > and isBot='true' "
> > > > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > > > _zpsbd6";
> > > > >
> > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric
> > is
> > > > > showing the lateRecordsDropped, while executing the group by
> > operation.
> > > > >
> > > > > Is there  a way to get the sideOutput of this to be able to debug
> > > better
> > > > ??
> > > > >
> > > > > Thanks,
> > > > > ~Ramya.
> > > > >
> > > >
> > >
> >
>

Reply via email to