Hi William,

The TsExtractor looks good.
This sounds like a strange behavior and should not (or only indirectly) be
related to the Kafka source since the WMs are generated by a separate
extractor.

- Did you compare the first (and only) generated watermark to the
timestamps of the records that are produced by the sources?
It might be far ahead of the timestamps in the records and won't be updated
because the timestamps of the records are smaller.

- What is the parallelism of the file sources / Kafka source and following
operators?
Watermarks can only advance if they advance in all parallel instance of the
timestamp extractor.

Best, Fabian

2018-01-18 16:09 GMT+01:00 William Saar <will...@saar.se>:

> Hi,
> The watermark does not seem to get updated at all after the first one is
> emitted. We used to get out-of-order warnings, but we changed to job to
> support a bounded timestamp extractor so we no longer get those warnings.
>
> Our timestamp extractor looks like this
>
> class TsExtractor[T](time : Time) extends 
> BoundedOutOfOrdernessTimestampExtractor[Timestamped[T]](time : Time) {
> override def extractTimestamp(element: Timestamped[T]): Long = 
> element.timestamp
> }
>
> Our stream topology starts with a single stream, then we do two separate flat 
> map and filtering operations on the initial stream to transform data batches
> into streams of two different event types. We then 
> assignTimestampsAndWatermarks(new TsExtractor[EventType](Time.seconds(20))) 
> for each event type on both
> branches before unioning the two branches to a single stream again (the 
> reason for the split is that the data used to come from two different topics).
>
> William
>
>
>
>
> ----- Original Message -----
> From:
> "Gary Yao" <g...@data-artisans.com>
>
> To:
> "William Saar" <will...@saar.se>
> Cc:
> "user" <user@flink.apache.org>
> Sent:
> Thu, 18 Jan 2018 11:11:17 +0100
> Subject:
> Re: Far too few watermarks getting generated with Kafka source
>
>
>
> Hi William,
>
> How often does the Watermark get updated? Can you share your code that
> generates
> the watermarks? Watermarks should be strictly ascending. If your code
> produces
> watermarks that are not ascending, smaller ones will be discarded. Could
> it be
> that the events in Kafka are more "out of order" with respect to event
> time than
> in your file?
>
> You can assign timestamps in the Kafka source or later. The Flink
> documentation
> has a section on why it could be beneficial to assign Watermarks in the
> Kafka
> source:
>
>   https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_
> timestamps_watermarks.html#timestamps-per-kafka-partition
>
> Best,
> Gary
>
> On Wed, Jan 17, 2018 at 5:15 PM, William Saar <will...@saar.se> wrote:
>
>> Hi,
>> I have a job where we read data from either Kafka or a file (for
>> testing), decode the entries and flat map them into events, and then add a
>> timestamp and watermark assigner to the events in a later operation. This
>> seems to generate periodic watermarks when running from a file, but when
>> Kafka is the source we barely get any watermark updates. What could be
>> causing this? (the environment has setAutowatermarkInterval(1000))
>>
>> Do we need to do all the timestamp and watermark assignment in the Kafka
>> source? or should it work to do it in later operations? The events do seem
>> to get propagated through the pipeline, we're just not getting watermarks...
>>
>> Thanks,
>> William
>>
>
>

Reply via email to