Hi, We are implementing EFO Kinesis IO reader provided by apache beam. I see that in code that for implementation of getCurrentTimestamp we always return getApproximateArrivalTimestamp and not the event time which we may have set for that record using withCustomWatermarkPolicy.
Please refer: https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91 However for KafkaIO we do something different: We always get the getCurrentTimestamp based on `timestampPolicy` set for Kafka where user can emit a custom timestamp associated with each record. Please refer: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210 So why is there a difference in these two implementations? We wanted the current timestamp based on some custom time embedded within the record and not approximate arrival time and not sure how we can achieve that. Please let us know if there is a way out to achieve this for Kinesis. Thanks Sachin