The source controls the watermark, since your using KafkaIO you can call KafkaIO.read().withWatermarkFn(myWatermarkFn) as seen here: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L118
On Tue, Dec 27, 2016 at 5:13 PM, Xu, Mingmin <ming...@ebay.com> wrote: > Thanks Lukasz. With the provided window function, can I control how the > watermark move forward ? Or a customized WindowFn is required. > > Sent from my iPhone > > On Dec 27, 2016, at 10:40 AM, Lukasz Cwik <lc...@google.com> wrote: > > The withAllowedLateness controls when data can enter the system and still > be considered valid. The timestamp of the data is always relative to the > watermark. > timestamp is before watermark - withAllowedLateness -> data can be dropped > timestamp is after watermark - withAllowedLatness -> data can not be > dropped > > Since in your case your using event time (and not processing time), the > watermark should not be moving forward when the source is not available. > > But, when there is no data being read from the source because there are no > records, it is up to the source to choose how the watermark advances and > may move the watermark forward based upon some estimate of where it thinks > the watermark should be at. Since some sources may not be able to tell > whether the source is truly unavailable or that there is just no incoming > data, it may move the watermark forward irregardless and thus > withAllowedLateness becomes important again. > > On Fri, Dec 23, 2016 at 10:58 AM, Xu, Mingmin <ming...@ebay.com> wrote: > >> Hello, >> >> >> >> I’m working on a POC project with Apache Beam. The rough pipeline reads >> from a checkout Kafka topic, and generate hourly summary data on different >> dimensions. I suppose a Fixed Time Window, with Time-Based Trigger could >> handle the case. –EventTime is the checkout timestamp. >> >> >> >> However, when the job, or the source is down for some time, like several >> hours, it would have problems to run the recovery. Data will be dropped, >> unless I set a large value for *withAllowedLateness, *large *allowedLateness+ >> accumulatingFiredPanes* also leads to lots of pane data in memory. Is >> this the right way to handle a recovery scenario? Appreciate for any >> suggestion. >> >> >> >> Thank you! >> >> Mingmin >> >> >> > >