Hi Fabian, I do not have anything to do "yourfunction". .apply windowFunction is legacy is what the documentation says. But I am at a loss to understand which of the reduce, aggregate, Fold, Apply must i use, as i hardly have any operations to perform but to return the stream with no late data back to me [So that I will construct a Flink table with this data and do my processing there].
~Ramya. On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Ramya, > > This works by calling getSideOutput() on the main output of the window > function. > The main output is collected by applying a function on the window. > > DataStream<Input> input = ... > OutputTag<Input> lateTag = ... > > DataStream<Result> mainResult = input > .keyBy(...) > .window(...) > .sideOutputLateData(lateTag) > .apply(yourFunction); > > DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag); > > Best, Fabian > > Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy < > hair...@gmail.com>: > > > Hi, > > > > We were trying to collect the sideOutput. > > But failed to understand as to how to convert this windowed stream to a > > datastream. > > > > final OutputTag<Tuple6<String, String, String, String, String, > Timestamp>> > > lateOutputTag = new OutputTag<Tuple6<String, String, String, String, > > String, Timestamp>>("late-data"){}; > > withTime.keyBy(0, 2) > > .window(TumblingEventTimeWindows.of(Time.minutes(5))) > > .allowedLateness(Time.minutes(1)) > > .sideOutputLateData(lateOutputTag); > > > > I would now have a windowed stream with records coming in late, tagged as > > lateOutputTag. How to convert the packets which are not late , back to a > > datastream. Do we need to use the .apply function to collect this data > ... > > quite unsure of this. Appreciate your help. > > > > Best Regards, > > > > > > > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <fhue...@gmail.com> > wrote: > > > > > Hi Ramya, > > > > > > This would be a great feature, but unfortunately is not support (yet) > by > > > Flink SQL. > > > Currently, all late records are dropped. > > > > > > A workaround is to ingest the stream as a DataStream, have a custom > > > operator that routes all late records to a side output, and registering > > the > > > DataStream without late records as a table on which the SQL query is > > > evaluated. > > > This requires quite a bit of boilerplate code but could be hidden in a > > util > > > class. > > > > > > Best, Fabian > > > > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > > > hair...@gmail.com>: > > > > > > > Hi, > > > > > > > > I have a query with regard to Late arriving records. > > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > > > In my sink operators, which converts this table to a stream which is > > > being > > > > pushed to Elastic Search, I am able to see this metric " > > > > *numLateRecordsDropped*". > > > > > > > > My Kafka consumers doesn't seem to have any lag and the events are > > > > processed properly. To be able to take these events to a side outputs > > > > doesn't seem to be possible with tables. Below is the snippet: > > > > > > > > tableEnv.connect(new Kafka() > > > > /* setting of all kafka properties */ > > > > .startFromLatest()) > > > > .withSchema(new Schema() > > > > .field("sid", Types.STRING()) > > > > .field("_zpsbd6", Types.STRING()) > > > > .field("r1", Types.STRING()) > > > > .field("r2", Types.STRING()) > > > > .field("r5", Types.STRING()) > > > > .field("r10", Types.STRING()) > > > > .field("isBot", Types.BOOLEAN()) > > > > .field("botcode", Types.STRING()) > > > > .field("ts", Types.SQL_TIMESTAMP()) > > > > .rowtime(new Rowtime() > > > > .timestampsFromField("recvdTime") > > > > .watermarksPeriodicBounded(10000) > > > > ) > > > > ) > > > > .withFormat(new Json().deriveSchema()) > > > > .inAppendMode() > > > > .registerTableSource("sourceTopic"); > > > > > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as > > total_hits, " > > > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as > > tumbleStart, " > > > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd > FROM > > > > sourceTopic " > > > > + "WHERE r1='true' or r2='true' or r5='true' or > > r10='true' > > > > and isBot='true' " > > > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > > > _zpsbd6"; > > > > > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric > is > > > > showing the lateRecordsDropped, while executing the group by > operation. > > > > > > > > Is there a way to get the sideOutput of this to be able to debug > > better > > > ?? > > > > > > > > Thanks, > > > > ~Ramya. > > > > > > > > > >