mjsax commented on code in PR #17054: URL: https://github.com/apache/kafka/pull/17054#discussion_r1741158880
########## streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java: ########## @@ -102,4 +106,23 @@ public interface ErrorHandlerContext { * @return the task ID */ TaskId taskId(); + + /** + * Return the current timestamp. + * + * <p> If it is triggered while processing a record streamed from the source processor, + * timestamp is defined as the timestamp of the current input record; the timestamp is extracted from + * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} by {@link TimestampExtractor}. + * Note, that an upstream {@link org.apache.kafka.streams.processor.api.Processor} might have set a new timestamp by calling + * {@link ProcessorContext#forward(Object, Object, To) forward(..., To.all().withTimestamp(...))}. + * In particular, some Kafka Streams DSL operators set result record timestamps explicitly, + * to guarantee deterministic results. + * + * <p> If it is triggered while processing a record generated not from the source processor (for example, + * if this method is invoked from the punctuate call), timestamp is defined as the current + * task's stream time, which is defined as the largest timestamp of any record processed by the task. Review Comment: (Did you verify the behavior? I would hope that TTD doe the right/same thing as the actual runtime for this case.) Also, I don't see an update? Last, given that `ErrorHandlerContext` in also used for the deserialization case, I am wondering if we should say something about this, too? For this case, the `TimestampExtractor` was not called yet, and thus we pass in `rawRecord.timestamp()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org