Thanks Max -

I will advance watermarks when no event arrives for a while.  But when
using Kafka is it a good practice to assign events to partitions randomly
instead say device id or region id where the devices are located. What I
noticed is if devices sending to one of the partitions stop sending
information, the pipeline completely freezes unless I manually keep moving
the watermark.

But the problem with sending events to random partitions is that when the
devices come back online, they send events which are now registered as late
events and the windows fire one element at a time.

Thanks,
Sameer

On Fri, Aug 12, 2016 at 4:41 AM, Maximilian Michels <m...@apache.org> wrote:

> Hi Sameer,
>
> If you use Event Time you should make sure to assign Watermarks and
> Timestamps at the source. As you already observed, Flink may get stuck
> otherwise because it waits for Watermarks to progress in time.
>
> There is no timeout for windows. However, you can implement that logic
> in your Watermark generation function.
>
> You're already using
> DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks
> assigner)
>
> Your assigner has a `getCurrentWatermark()` method. This is called
> every ExecutionConfig#getAutoWatermarkInterval() milliseconds. You can
> set this via ExecutionConfig#setAutoWatermarkInterval(long
> milliseconds).
>
> In your assigner, simply create a field to keep track of the last time
> you emitted a Watermark. If you haven't emitted a Watermark for some
> time, you can kick off a timeout and emit a Watermark.
>
> Cheers,
> Max
>
> On Thu, Aug 11, 2016 at 1:05 AM, Sameer W <sam...@axiomine.com> wrote:
> > Sorry for replying to my own messages but this is super confusing and
> > logical at the same time to me :-).
> >
> > If I have Kafka Topic with 10 partitions. If I partition by device id
> when I
> > write to the Topic, and use Event Time, my pipeline freezes (if fewer
> than
> > 10 devices are active initially). Because if some partitions are inactive
> > (only a few devices active at a time) they do not send watermarks and my
> > pipeline waits forever for those partitions to send in their watermarks
> even
> > if the keyBy is on the device id whose records are going to come from
> only
> > one partition.
> >
> > When I send records to Kafka randomly (to any partition) the pipeline
> works
> > fine as all partitions (sources connected to them) are sending
> watermarks.
> >
> > This gets even more confusing if I apply watermarks and timestamps
> > downstream after a KeyBy operation which is again followed by another
> keyBy
> > which does not receive events for a key from all the upstream operators.
> > Again nothing fires as Flink expects other map operators (to which the
> > watermark assignment is piped) to send in the watermarks as well.
> >
> > My conclusion: Only produce watermarks at the source function. Is this
> valid
> > or am I missing something? Because only when I do that (and random
> > allocation of events to partitions in Kafka) the whole pipeline works
> > reliably.
> >
> > If there a way to set a timeout - If watermarks from source functions are
> > not received within a certain time interval, fire the time windows.
> >
> > Thanks,
> > Sameer
> >
> >
> >
> >
> > On Wed, Aug 10, 2016 at 3:27 PM, Sameer W <sam...@axiomine.com> wrote:
> >>
> >> And this is happening in my local environment. As soon as I set the
> >> parallelism to 1 it all works fine.
> >>
> >> Sameer
> >>
> >> On Wed, Aug 10, 2016 at 3:11 PM, Sameer W <sam...@axiomine.com> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I am noticing this behavior with Event Time processing-
> >>>
> >>> I have a Kafka topic with 10 partitions. Each Event Source sends data
> to
> >>> any one of the partitions. Say I have only 1 event source active at
> this
> >>> moment, which means only one partition is receiving data.
> >>>
> >>> None of my windows will fire now because the 9 partitions (source
> >>> function instances) are not sending any watermarks and Flink waits
> forever.
> >>>
> >>> I go to topic with 1 partition but leave default parallelism intact.
> Only
> >>> one Mapper instance contributes to the subsequent keyBy operation but
> other
> >>> 7 (assuming 8 of default parallelism) are idle. I assign watermarks
> after
> >>> the map function. Again the same behavior because the 7 other mappers
> are
> >>> not sending watermarks.
> >>>
> >>> How do I handle this? Not all of my partitions are going to be
> receiving
> >>> data at all times using this partitioning strategy. Or I have to use
> random
> >>> partitioning which will also work.
> >>>
> >>> Thanks,
> >>> Sameer
> >>
> >>
> >
>

Reply via email to