Hi Gary,
Thanks.I do have some of the events coming in after one pauses and i am able to see watermarked being advanced event being triggered. Rohan On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao <g...@data-artisans.com> wrote: > Hi Rohan, > > In your example, are you saying that after 5:40 you will not receive any > events > at all which could advance the watermark? > > I am asking because if you are receiving events for other keys/ids from > your > KafkaSource after 5:40, the watermark will still be advanced and fire the > tumbling window. > > Best, > Gary > > On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa < > rohan.thimma...@gmail.com> wrote: > >> No. My question is slightly different. >> >> say i get report from 5.10-5.40. the device went offline and never comes >> back. i will not get any report after 5.40. So 5-6 window never gets closed >> as we will not get any report after 5.40. in this case 5.00-5.40 data is >> still in flink memory that will never get sent as we are closing the window >> by seeing the next hour window. ie any report carrying 6.00 end date in it. >> >> >> so what i would like to do is. Wait for say 1 or 2 hours if i don't get >> message for the given id then i would like to close the window and send >> this to destination system(in my case kafka topic.) >> >> >> >> >> Rohan >> >> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <g...@data-artisans.com> wrote: >> >>> Hi Rohan, >>> >>> I am not sure if I fully understand your problem. For example, if you >>> receive an >>> event with a start time of 4:50 and an end time of 5:30, do you want the >>> "usage" >>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the >>> event had >>> an end time of 5:31? Do you then want to ignore the event for the 4:00 - >>> 5:00 >>> window? >>> >>> Best, >>> >>> Gary >>> >>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa < >>> rohan.thimma...@gmail.com> wrote: >>> >>>> Hi Gary, >>>> >>>> This is perfect. I am able to get the window working on message >>>> timestamp then clock window also stream the data that are late. >>>> >>>> I also having one edge case. >>>> >>>> >>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report >>>> *ever*. i would like to wait for sometime. My reporting interval size >>>> is 30 min. if in next 30 min if i don't see any record then i would like to >>>> construct 4-5 by closing the window and dispatch the report. Intention is i >>>> don't want to loss the last hour of the data since the stream end in >>>> between the hour. >>>> >>>> Rohan >>>> >>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com> >>>> wrote: >>>> >>>>> Hi Rohan, >>>>> >>>>> Your ReportTimestampExtractor assigns timestamps to the stream records >>>>> correctly >>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis). >>>>> In Flink >>>>> Watermarks are the mechanism to advance the event time. Hence, you >>>>> should emit >>>>> Watermarks according to the time that you extract from your events. >>>>> >>>>> You can take a look at the already existing timestamp extractors / >>>>> watermark >>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see >>>>> how it can >>>>> be done. >>>>> >>>>> Best, >>>>> Gary >>>>> >>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/ >>>>> dev/event_timestamp_extractors.html >>>>> >>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa < >>>>> rohan.thimma...@gmail.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> >>>>>> I have following requirement >>>>>> >>>>>> 1. i have avro json message containing {eventid, usage, starttime, >>>>>> endtime} >>>>>> 2. i am reading this from kafka source >>>>>> >>>>>> 3. if there is overlapping hour in a record split the record by >>>>>> rounding off to hourly bounderies >>>>>> 4.My objective is a) read the message b) aggregate the usage between >>>>>> the hour >>>>>> 5. send the aggregated data to another kafka topic. >>>>>> >>>>>> i don't want aggregate based on clock window. if i see next hour in >>>>>> endtime then i would like to close the window and aggregated usage to be >>>>>> send down to kafka sink topic. >>>>>> >>>>>> >>>>>> eg: >>>>>> input data >>>>>> 4.55 - 5.00 >>>>>> 5.00 -5.25 >>>>>> 5.25- 5.55. >>>>>> 5.55-625 >>>>>> >>>>>> after split >>>>>> 4.55- 5.00 - expect record to be going out with this >>>>>> 5.00 -5.25 >>>>>> 5.25- 5.55. >>>>>> 5.55-6.00 - expect record to be going out with this >>>>>> 5.00-625 >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> 1. i have set the eventime : >>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >>>>>> >>>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, >>>>>> Report]] = stream >>>>>> .flatMap(new SplitFlatMap() // checks if the overlapping hour if yes >>>>>> then create split recordr with hourly boundarry >>>>>> .assignTimestampsAndWatermarks(new ReportTimestampExtractor) >>>>>> .keyBy(0) >>>>>> >>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong))) >>>>>> >>>>>> .reduce(new Counter()) //aggrigates the usage collected within window >>>>>> >>>>>> 3. here is the implementation for timestampeextractor >>>>>> >>>>>> class ReportTimestampExtractor extends >>>>>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with >>>>>> Serializable { >>>>>> override def extractTimestamp(e: Tuple2[String, Report], >>>>>> prevElementTimestamp: Long) = { >>>>>> e.f1.getEndTime >>>>>> } >>>>>> >>>>>> override def getCurrentWatermark(): Watermark = { >>>>>> new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 >>>>>> hour >>>>>> } >>>>>> } >>>>>> >>>>>> >>>>>> I see the aggregation is generated only the clock window rather than >>>>>> when the window sees next hour in the record. >>>>>> >>>>>> >>>>>> >>>>>> Is there anything i am missing. by definition eventtime if i set it >>>>>> should respect message time rather than clock window >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Thanks >>>>>> Rohan >>>>>> >>>>> >>>>> >>>> >>>> >>>> -- >>>> Thanks >>>> Rohan >>>> >>> >>> >> >> >> -- >> Thanks >> Rohan >> > > -- Thanks Rohan