mjsax commented on code in PR #17054: URL: https://github.com/apache/kafka/pull/17054#discussion_r1739583131
########## streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java: ########## @@ -102,4 +106,23 @@ public interface ErrorHandlerContext { * @return the task ID */ TaskId taskId(); + + /** + * Return the current timestamp. + * + * <p> If it is triggered while processing a record streamed from the source processor, + * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from + * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. + * Note, that an upstream {@link org.apache.kafka.streams.processor.api.Processor} might have set a new timestamp by calling + * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}. + * In particular, some Kafka Streams DSL operators set result record timestamps explicitly, + * to guarantee deterministic results. + * + * <p> If it is triggered while processing a record generated not from the source processor (for example, + * if this method is invoked from the punctuate call), timestamp is defined as the current + * task's stream time, which is defined as the largest timestamp of any record processed by the task. Review Comment: Is this correct for all cases? It's for sure correct for the old PAPI using `processor.ProcessorContext`. But I am not sure about the new PAPI which uses `processor.api.ProcessorContext` for which `forward(Record)` take a `Record` which will have a timestamp? Can we verify this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org