To answer your specific question, you should create and return the WallTime
estimator. You shouldn't need to interact with it from within
your @ProcessElement call since your elements are using the current time
for their timestamp.

On Fri, Sep 18, 2020 at 10:04 AM Luke Cwik <[email protected]> wrote:

> Kafka is a complex example because it is adapting code from before there
> was an SDF implementation (namely the TimestampPolicy and the
> TimestampFn/TimestampFnS/WatermarkFn/WatermarkFn2 functions).
>
> There are three types of watermark estimators that are in the Beam Java
> SDK today:
> Manual: Can be invoked from within your @ProcessElement method within your
> SDF allowing you precise control over what the watermark is.
> WallTime: Doesn't need to be interacted with, will report the current time
> as the watermark time. Once it is instantiated and returned via the
> @NewWatermarkEstimator method you don't need to do anything with it. This
> is functionally equivalent to calling setWatermark(Instant.now()) right
> before returning from the @ProcessElement method in the SplittableDoFn on a
> Manual watermark.
> TimestampObserving: Is invoked using the output timestamp for every
> element that is output. This is functionally equivalent to calling
> setWatermark after each output within your @ProcessElement method in the
> SplittableDoFn. The MonotonicallyIncreasing implementation for
> the TimestampObserving estimator ensures that the largest timestamp seen so
> far will be reported for the watermark.
>
> The default is to not set any watermark estimate.
>
> For all watermark estimators you're allowed to set the watermark estimate
> to anything as the runner will recompute the output watermark as:
> new output watermark = max(previous output watermark, min(upstream
> watermark, watermark estimates))
> This effectively means that the watermark will never go backwards from the
> runners point of view but that does mean that setting the watermark
> estimate below the previous output watermark (which isn't observable) will
> not do anything beyond holding the watermark at the previous output
> watermark.
>
> Depending on the windowing strategy and allowed lateness, any records that
> are output with a timestamp that is too early can be considered droppably
> late, otherwise they will be late/ontime/early.
>
> So as an author for an SDF transform, you need to figure out:
> 1) What timestamp your going to output your records at
> * use upstream input elements timestamp: guidance use the default
> implementation and to get the upstream watermark by default
> * use data from within the record being output or external system state
> via an API call: use a watermark estimator
> 2) How you want to compute the watermark estimate (if at all)
> * the choice here depends on how the elements timestamps progress, are
> they in exactly sorted order, almost sorted order, completely unsorted, ...?
>
> For both of these it is upto you to choose how much flexibility in these
> decisions you want to give to your users and that should guide what you
> expose within the API (like how KafkaIO exposes a TimestampPolicy) or how
> many other sources don't expose anything.
>
>
> On Thu, Sep 17, 2020 at 8:43 AM Praveen K Viswanathan <
> [email protected]> wrote:
>
>> Hi Luke,
>>
>> I am also looking at the `WatermarkEstimators.manual` option, in
>> parallel. Now we are getting data past our Fixed Window but the aggregation
>> is not as expected.  The doc says setWatermark will "set timestamp
>> before or at the timestamps of all future elements produced by the
>> associated DoFn". If I output with a timestamp as below then could you
>> please clarify on how we should set the watermark for this manual
>> watermark estimator?
>>
>> receiver.outputWithTimestamp(ossRecord, Instant.now());
>>
>> Thanks,
>> Praveen
>>
>> On Mon, Sep 14, 2020 at 9:10 AM Luke Cwik <[email protected]> wrote:
>>
>>> Is the watermark advancing[1, 2] for the SDF such that the windows can
>>> close allowing for the Count transform to produce output?
>>>
>>> 1: https://www.youtube.com/watch?v=TWxSLmkWPm4
>>> 2: https://beam.apache.org/documentation/programming-guide/#windowing
>>>
>>> On Thu, Sep 10, 2020 at 12:39 PM Gaurav Nakum <[email protected]>
>>> wrote:
>>>
>>>> Hi everyone!
>>>>
>>>> We are developing a new IO connector using the SDF API, and testing it
>>>> with the following simple counting pipeline:
>>>>
>>>>
>>>>
>>>> p.apply(MyIO.read()
>>>>
>>>>         .withStream(inputStream)
>>>>
>>>>         .withStreamPartitions(Arrays.asList(0))
>>>>
>>>>         .withConsumerConfig(config)
>>>>
>>>>     ) // gets a PCollection<KV<String, String>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> .apply(Values.<String>*create*()) // PCollection<String>
>>>>
>>>>
>>>>
>>>> .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
>>>>
>>>>     .withAllowedLateness(Duration.standardDays(1))
>>>>
>>>>     .accumulatingFiredPanes())
>>>>
>>>>
>>>>
>>>> .apply(Count.<String>perElement())
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> // write PCollection<KV<String, Long>> to stream
>>>>
>>>> .apply(MyIO.write()
>>>>
>>>>         .withStream(outputStream)
>>>>
>>>>         .withConsumerConfig(config));
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Without the window transform, we can read from the stream and write to
>>>> it, however, I don’t see output after the Window transform. Could you
>>>> please help pin down the issue?
>>>>
>>>> Thank you,
>>>>
>>>> Gaurav
>>>>
>>>
>>
>> --
>> Thanks,
>> Praveen K Viswanathan
>>
>

Reply via email to