Hi Lukasz,

There was a bug in my code. When the topic is idle, I indeed get watermark
as (now - maxDelay).

I have a few questions:
I have created a static int variable in my watermark class and incremented
the variable inside the constructor. I ran the pipeline in SparkRunner for
approximately 3 minutes, and in the logs, I see that the counter value is
around 800, which implies that my watermark class instance is created 800
times.

previousWatermark javadoc[1] suggests that it is the latest checkpointed
watermark. Does this mean watermark is checkpointed along with partition
offsets? As the counter value is around 800, does this mean watermark is
checkpointed 800 times? Does this mean my watermark class instance will be
created after every new checkpoint?

1:
https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L49


Thanks,
Rahul

On Thu, May 23, 2019 at 12:23 AM Lukasz Cwik <lc...@google.com> wrote:

>
>
> On Wed, May 22, 2019 at 11:17 AM rahul patwari <rahulpatwari8...@gmail.com>
> wrote:
>
>> will watermark also get checkpointed by default along with the offset of
>> the partition?
>>
>> We have found a limitation for CustomTimestampPolicyWithLimitedDelay. 
>> Consider
>> this scenario:
>> If we are processing a stream of events from Kafka with event timestamps
>> older than the current processing time(say 1 day), and If I set maxDelay as
>> 1 day, when the topic is idle(for some time), watermark will advance to the
>> current time, thereby discarding any data which arrives later in the
>> pipeline(as the event timestamps are 1 day old) considering them as late.
>>
>
> This seems like a bug since the watermark should only advance to
> currentTime - maxDelay if the topic is empty based upon the
> CustomTimestampPolicyWithLimitedDelay javadoc[1].
>
> Also, can we use an instance variable(to calculate idle time in Kafka
>> topic and advance watermark accordingly, instead of moving the watermark to
>> the current time) which cannot be checkpointed, for the class which
>> implements the createTimestampPolicy method in TimestampPolicyFactory
>> interface?
>>
>> Regards,
>> Rahul
>>
>> On Wed, May 22, 2019 at 9:04 AM rahul patwari <rahulpatwari8...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using withTimestampPolicyFactory
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
>>> (TimestampPolicyFactory
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
>>> <K
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>>> ,V
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
>>> > timestampPolicyFactory) method in KafkaIO.Read, where we have written
>>> a lambda for createTimestampPolicy
>>> <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
>>>  tp,
>>> java.util.Optional<Instant
>>> <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
>>> > previousWatermark).
>>>
>>> Sample code:
>>> KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
>>> GenericRecord>read()
>>>
>>> .withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializerAndCoder(GenericAvroDeserializer.class,
>>> AvroCoder.of(GenericRecord.class, avroSchema))
>>>                 .withTimestampPolicyFactory((tp, prevWatermark) -> new
>>> KafkaCustomTimestampPolicy(prevWatermark));
>>>
>>> The topic we are reading from only has one partition.
>>> In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance
>>> is being created multiple times.
>>>
>>> Is there any documentation describing the guidelines one should follow
>>> when implementing custom watermark?
>>>
>>
> Javadoc for getWatermark on unbounded sources[2].
>
>
>> How does checkpointing affect the watermark?
>>>
>>
> The watermark doesn't advance while a source is checkpointed and not being
> actively processed. The watermark only advances based upon the values that
> the source provides.
>
>
>>
>>>
>>> StackTrace from constructor of KafkaCustomTimestampPolicy:
>>>
>>>  
>>> com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41),
>>>  
>>> com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
>>> instance is created here)
>>>
>>>  
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536),
>>>
>>>  
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
>>>
>>>  
>>> org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
>>>
>>>  
>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
>>>
>>>  
>>> org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
>>>
>>>  
>>> org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
>>>
>>>  
>>> org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
>>>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
>>>  java.util.concurrent.FutureTask.run(FutureTask.java:266),
>>>
>>>  
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
>>>
>>>  
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
>>>  java.lang.Thread.run(Thread.java:748)
>>>
>>
> 1:
> https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L33
>
> 2:
> https://github.com/apache/beam/blob/de08c89fe941f8c5697442615f6d2d1d4340aa38/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L228
>

Reply via email to