The source controls the watermark, since your using KafkaIO you can call
KafkaIO.read().withWatermarkFn(myWatermarkFn) as seen here:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L118



On Tue, Dec 27, 2016 at 5:13 PM, Xu, Mingmin <ming...@ebay.com> wrote:

> Thanks Lukasz. With the provided window function, can I control how the
> watermark move forward ? Or a customized WindowFn is required.
>
> Sent from my iPhone
>
> On Dec 27, 2016, at 10:40 AM, Lukasz Cwik <lc...@google.com> wrote:
>
> The withAllowedLateness controls when data can enter the system and still
> be considered valid. The timestamp of the data is always relative to the
> watermark.
> timestamp is before watermark - withAllowedLateness -> data can be dropped
> timestamp is after watermark - withAllowedLatness -> data can not be
> dropped
>
> Since in your case your using event time (and not processing time), the
> watermark should not be moving forward when the source is not available.
>
> But, when there is no data being read from the source because there are no
> records, it is up to the source to choose how the watermark advances and
> may move the watermark forward based upon some estimate of where it thinks
> the watermark should be at. Since some sources may not be able to tell
> whether the source is truly unavailable or that there is just no incoming
> data, it may move the watermark forward irregardless and thus
> withAllowedLateness becomes important again.
>
> On Fri, Dec 23, 2016 at 10:58 AM, Xu, Mingmin <ming...@ebay.com> wrote:
>
>> Hello,
>>
>>
>>
>> I’m working on a POC project with Apache Beam. The rough pipeline reads
>> from a checkout Kafka topic, and generate hourly summary data on different
>> dimensions. I suppose a Fixed Time Window, with Time-Based Trigger could
>> handle the case. –EventTime is the checkout timestamp.
>>
>>
>>
>> However, when the job, or the source is down for some time, like several
>> hours, it would have problems to run the recovery. Data will be dropped,
>> unless I set a large value for *withAllowedLateness, *large *allowedLateness+
>> accumulatingFiredPanes* also leads to lots of pane data in memory. Is
>> this the right way to handle a recovery scenario? Appreciate for any
>> suggestion.
>>
>>
>>
>> Thank you!
>>
>> Mingmin
>>
>>
>>
>
>

Reply via email to