Hi, We are using withTimestampPolicyFactory <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-> (TimestampPolicyFactory <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html> <K <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> ,V <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html> > timestampPolicyFactory) method in KafkaIO.Read, where we have written a lambda for createTimestampPolicy <https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/kafka/TimestampPolicyFactory.html#createTimestampPolicy-org.apache.kafka.common.TopicPartition-java.util.Optional->(org.apache.kafka.common.TopicPartition tp, java.util.Optional<Instant <http://www.joda.org/joda-time/apidocs/org/joda/time/Instant.html?is-external=true> > previousWatermark).
Sample code: KafkaIO.Read<String, GenericRecord> kafkaIoRead = KafkaIO.<String, GenericRecord>read() .withBootstrapServers(bootstrapServerUrl).withTopic(topicName) .withKeyDeserializer(StringDeserializer.class) .withValueDeserializerAndCoder(GenericAvroDeserializer.class, AvroCoder.of(GenericRecord.class, avroSchema)) .withTimestampPolicyFactory((tp, prevWatermark) -> new KafkaCustomTimestampPolicy(prevWatermark)); The topic we are reading from only has one partition. In the lifecycle of the pipeline, KafkaCustomTimestampPolicy instance is being created multiple times. Is there any documentation describing the guidelines one should follow when implementing custom watermark? How does checkpointing affect the watermark? StackTrace from constructor of KafkaCustomTimestampPolicy: com.beam.transforms.KafkaCustomTimestampPolicy.<init>(KafkaCustomTimestampPolicy.java:41), com.beam.transforms.CreateKafkaSource.lambda$createNewInstance$bf84864f$1(CreateKafkaSource.java:99),(KafkaIO.Read instance is created here) org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.<init>(KafkaUnboundedReader.java:536), org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:126), org.apache.beam.sdk.io.kafka.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:43), org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:226), org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:132), org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:160), org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:124), java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511), java.util.concurrent.FutureTask.run(FutureTask.java:266), java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149), java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624), java.lang.Thread.run(Thread.java:748)