Hi Gary,

 Thanks.I do have some of the events coming in after one pauses and i
am able to see watermarked being advanced event being triggered.


Rohan

On Mon, Jan 15, 2018 at 5:40 AM, Gary Yao <g...@data-artisans.com> wrote:

> Hi Rohan,
>
> In your example, are you saying that after 5:40 you will not receive any
> events
> at all which could advance the watermark?
>
> I am asking because if you are receiving events for other keys/ids from
> your
> KafkaSource after 5:40, the watermark will still be advanced and fire the
> tumbling window.
>
> Best,
> Gary
>
> On Mon, Jan 15, 2018 at 9:03 AM, Rohan Thimmappa <
> rohan.thimma...@gmail.com> wrote:
>
>> No. My question is slightly different.
>>
>> say i get report from 5.10-5.40. the device went offline and never comes
>> back. i will not get any report after 5.40. So 5-6 window never gets closed
>> as we will not get any report after 5.40. in this case 5.00-5.40 data is
>> still in flink memory that will never get sent as we are closing the window
>> by seeing the next hour window. ie any report carrying 6.00 end date in it.
>>
>>
>> so what i would like to do is. Wait for say 1 or 2 hours if i don't get
>> message for the given id then i would like to close the window and send
>> this to destination system(in my case kafka topic.)
>>
>>
>>
>>
>> Rohan
>>
>> On Sun, Jan 14, 2018 at 1:00 PM, Gary Yao <g...@data-artisans.com> wrote:
>>
>>> Hi Rohan,
>>>
>>> I am not sure if I fully understand your problem. For example, if you
>>> receive an
>>> event with a start time of 4:50 and an end time of 5:30, do you want the
>>> "usage"
>>> from 4:50 - 5:00 to be included in the 4:00 - 5:00 window? What if the
>>> event had
>>> an end time of 5:31? Do you then want to ignore the event for the 4:00 -
>>> 5:00
>>> window?
>>>
>>> Best,
>>>
>>> Gary
>>>
>>> On Fri, Jan 12, 2018 at 8:45 PM, Rohan Thimmappa <
>>> rohan.thimma...@gmail.com> wrote:
>>>
>>>> Hi Gary,
>>>>
>>>> This is perfect. I am able to get the window working on message
>>>> timestamp then clock window also stream the data that are late.
>>>>
>>>> I also having one edge case.
>>>>
>>>>
>>>> for eg i get my last report at 4.57 and i never get 5.00+ hour report
>>>> *ever*. i would like to wait for sometime. My reporting interval size
>>>> is 30 min. if in next 30 min if i don't see any record then i would like to
>>>> construct 4-5 by closing the window and dispatch the report. Intention is i
>>>> don't want to loss the last hour of the data since the stream end in
>>>> between the hour.
>>>>
>>>> Rohan
>>>>
>>>> On Fri, Jan 12, 2018 at 12:00 AM, Gary Yao <g...@data-artisans.com>
>>>> wrote:
>>>>
>>>>> Hi Rohan,
>>>>>
>>>>> Your ReportTimestampExtractor assigns timestamps to the stream records
>>>>> correctly
>>>>> but uses the wall clock to emit Watermarks (System.currentTimeMillis).
>>>>> In Flink
>>>>> Watermarks are the mechanism to advance the event time. Hence, you
>>>>> should emit
>>>>> Watermarks according to the time that you extract from your events.
>>>>>
>>>>> You can take a look at the already existing timestamp extractors /
>>>>> watermark
>>>>> emitters [1], such as BoundedOutOfOrdernessTimestampExtractor, to see
>>>>> how it can
>>>>> be done.
>>>>>
>>>>> Best,
>>>>> Gary
>>>>>
>>>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>>>>> dev/event_timestamp_extractors.html
>>>>>
>>>>> On Fri, Jan 12, 2018 at 5:30 AM, Rohan Thimmappa <
>>>>> rohan.thimma...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>>
>>>>>> I have following requirement
>>>>>>
>>>>>> 1. i have avro json message containing {eventid, usage, starttime,
>>>>>> endtime}
>>>>>> 2. i am reading this from kafka source
>>>>>>
>>>>>> 3. if there is overlapping hour in a record split the record by
>>>>>> rounding off to hourly bounderies
>>>>>> 4.My objective is a) read the message b) aggregate the usage between
>>>>>> the hour
>>>>>> 5. send the aggregated data to another kafka topic.
>>>>>>
>>>>>> i don't want aggregate based on clock window. if i see next hour in
>>>>>> endtime then i would like to close the window and aggregated usage to be
>>>>>> send down to kafka sink topic.
>>>>>>
>>>>>>
>>>>>> eg:
>>>>>> input data
>>>>>> 4.55 - 5.00
>>>>>> 5.00 -5.25
>>>>>> 5.25- 5.55.
>>>>>> 5.55-625
>>>>>>
>>>>>> after split
>>>>>> 4.55- 5.00 - expect record to be going out with this
>>>>>> 5.00 -5.25
>>>>>> 5.25- 5.55.
>>>>>> 5.55-6.00 - expect record to be going out with this
>>>>>> 5.00-625
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 1. i have set the eventime : 
>>>>>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>>>>>
>>>>>> 2. val hourlyAggregate: SingleOutputStreamOperator[Tuple2[String, 
>>>>>> Report]] = stream
>>>>>>   .flatMap(new SplitFlatMap()  // checks if the overlapping hour if yes 
>>>>>> then create split recordr with hourly boundarry
>>>>>>   .assignTimestampsAndWatermarks(new ReportTimestampExtractor)
>>>>>>   .keyBy(0)
>>>>>>       
>>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(intervalsecond.toLong)))
>>>>>>
>>>>>>   .reduce(new Counter()) //aggrigates the usage collected within window
>>>>>>
>>>>>> 3. here is the implementation for timestampeextractor
>>>>>>
>>>>>> class ReportTimestampExtractor extends 
>>>>>> AssignerWithPeriodicWatermarks[Tuple2[String, EMMReport]] with 
>>>>>> Serializable {
>>>>>>   override def extractTimestamp(e: Tuple2[String, Report], 
>>>>>> prevElementTimestamp: Long) = {
>>>>>>     e.f1.getEndTime
>>>>>>   }
>>>>>>
>>>>>>   override def getCurrentWatermark(): Watermark = {
>>>>>>     new Watermark(System.currentTimeMillis- 36000) //respect delay for 1 
>>>>>> hour
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> I see the aggregation is generated only the clock window rather than 
>>>>>> when the window sees next hour in the record.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Is there anything i am missing. by definition eventtime if i set it 
>>>>>> should respect message time rather than clock window
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks
>>>>>> Rohan
>>>>>>
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks
>>>> Rohan
>>>>
>>>
>>>
>>
>>
>> --
>> Thanks
>> Rohan
>>
>
>


-- 
Thanks
Rohan

Reply via email to