Hi,

We are using withTimestampPolicyFactory
<https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->
(TimestampPolicyFactory
<https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html>
<K
<https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
,V
<https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html>
> timestampPolicyFactory) method in KafkaIO.Read, where we have written a
lambda for createTimestampPolicy
<https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition
tp,
java.util.Optional<Instant
<http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true>
> previousWatermark).

Sample code:
KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String,
GenericRecord>read()

.withBootstrapServers(bootstrapServerUrl).withTopic(topicName)
                .withKeyDeserializer(StringDeserializer.class)

.withValueDeserializerAndCoder(GenericAvroDeserializer.class,
AvroCoder.of(GenericRecord.class, avroSchema))
                .withTimestampPolicyFactory((tp, prevWatermark) -> new
KafkaCustomTimestampPolicy(prevWatermark));

The topic we are reading from only has one partition.
In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is
being created multiple times.

Is there any documentation describing the guidelines one should follow when
implementing custom watermark?
How does checkpointing affect the watermark?

StackTrace from constructor of KafkaCustomTimestampPolicy:
 
com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41),
 
com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read
instance is created here)
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536),
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126),
 
org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43),
 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226),
 
org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132),
 
org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160),
 
org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124),
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511),
 java.util.concurrent.FutureTask.run(FutureTask.java:266),
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149),
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624),
 java.lang.Thread.run(Thread.java:748)

Reply via email to