Hi Rohan, In your example, are you saying that after 5:40 you will not receive any events at all which could advance the watermark?
I am asking because if you are receiving events for other keys/ids from your KafkaSource after 5:40, the watermark will still be advanced and fire the tumbling window. Best, Gary On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <rohan.thimma...@gmail.com> wrote: > No. My question is slightly different. > > say i get report from 5.10-5.40. the device went offline and never comes > back. i will not get any report after 5.40. So 5-6 window never gets closed > as we will not get any report after 5.40. in this case 5.00-5.40 data is > still in flink memory that will never get sent as we are closing the window > by seeing the next hour window. ie any report carrying 6.00 end date in it. > > > so what i would like to do is. Wait for say 1 or 2 hours if i don't get > message for the given id then i would like to close the window and send > this to destination system(in my case kafka topic.) > > > > > Rohan > > On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <g...@data-artisans.com> wrote: > >> Hi Rohan, >> >> I am not sure if I fully understand your problem. For example, if you >> receive an >> event with a start time of 4:50 and an end time of 5:30, do you want the >> "usage" >> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the >> event had >> an end time of 5:31? Do you then want to ignore the event for the 4:00 - >> 5:00 >> window? >> >> Best, >> >> Gary >> >> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa < >> rohan.thimma...@gmail.com> wrote: >> >>> Hi Gary, >>> >>> This is perfect. I am able to get the window working on message >>> timestamp then clock window also stream the data that are late. >>> >>> I also having one edge case. >>> >>> >>> for eg i get my last report at 4.57 and i never get 5.00+ hour report >>> *ever*. i would like to wait for sometime. My reporting interval size >>> is 30 min. if in next 30 min if i don't see any record then i would like to >>> construct 4-5 by closing the window and dispatch the report. Intention is i >>> don't want to loss the last hour of the data since the stream end in >>> between the hour. >>> >>> Rohan >>> >>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com> >>> wrote: >>> >>>> Hi Rohan, >>>> >>>> Your ReportTimestampExtractor assigns timestamps to the stream records >>>> correctly >>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis). >>>> In Flink >>>> Watermarks are the mechanism to advance the event time. Hence, you >>>> should emit >>>> Watermarks according to the time that you extract from your events. >>>> >>>> You can take a look at the already existing timestamp extractors / >>>> watermark >>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see >>>> how it can >>>> be done. >>>> >>>> Best, >>>> Gary >>>> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>>> dev/event_timestamp_extractors.html >>>> >>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa < >>>> rohan.thimma...@gmail.com> wrote: >>>> >>>>> Hi All, >>>>> >>>>> >>>>> I have following requirement >>>>> >>>>> 1. i have avro json message containing {eventid, usage, starttime, >>>>> endtime} >>>>> 2. i am reading this from kafka source >>>>> >>>>> 3. if there is overlapping hour in a record split the record by >>>>> rounding off to hourly bounderies >>>>> 4.My objective is a) read the message b) aggregate the usage between >>>>> the hour >>>>> 5. send the aggregated data to another kafka topic. >>>>> >>>>> i don't want aggregate based on clock window. if i see next hour in >>>>> endtime then i would like to close the window and aggregated usage to be >>>>> send down to kafka sink topic. >>>>> >>>>> >>>>> eg: >>>>> input data >>>>> 4.55 - 5.00 >>>>> 5.00 -5.25 >>>>> 5.25- 5.55. >>>>> 5.55-625 >>>>> >>>>> after split >>>>> 4.55- 5.00 - expect record to be going out with this >>>>> 5.00 -5.25 >>>>> 5.25- 5.55. >>>>> 5.55-6.00 - expect record to be going out with this >>>>> 5.00-625 >>>>> >>>>> >>>>> >>>>> >>>>> 1. i have set the eventime : >>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>> >>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, >>>>> Report]] = stream >>>>> .flatMap(new SplitFlatMap() // checks if the overlapping hour if yes >>>>> then create split recordr with hourly boundarry >>>>> .assignTimestampsAndWatermarks(new ReportTimestampExtractor) >>>>> .keyBy(0) >>>>> >>>>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong))) >>>>> >>>>> .reduce(new Counter()) //aggrigates the usage collected within window >>>>> >>>>> 3. here is the implementation for timestampeextractor >>>>> >>>>> class ReportTimestampExtractor extends >>>>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with >>>>> Serializable { >>>>> override def extractTimestamp(e: Tuple2[String, Report], >>>>> prevElementTimestamp: Long) = { >>>>> e.f1.getEndTime >>>>> } >>>>> >>>>> override def getCurrentWatermark(): Watermark = { >>>>> new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 >>>>> hour >>>>> } >>>>> } >>>>> >>>>> >>>>> I see the aggregation is generated only the clock window rather than when >>>>> the window sees next hour in the record. >>>>> >>>>> >>>>> >>>>> Is there anything i am missing. by definition eventtime if i set it >>>>> should respect message time rather than clock window >>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> Thanks >>>>> Rohan >>>>> >>>> >>>> >>> >>> >>> -- >>> Thanks >>> Rohan >>> >> >> > > > -- > Thanks > Rohan >