Piotr Smolinski created KAFKA-10062:
---------------------------------------

             Summary: Add a method to retrieve the current timestamp as known 
by the Streams app
                 Key: KAFKA-10062
                 URL: https://issues.apache.org/jira/browse/KAFKA-10062
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.5.0
            Reporter: Piotr Smolinski


Please add to the ProcessorContext a method to retrieve current timestamp 
compatible with Punctuator#punctate(long) method.

Proposal in ProcessorContext:

long getTimestamp(PunctuationType type);

The method should return time value as known by the Punctuator scheduler with 
the respective PunctuationType.

The use-case is tracking of a process with timeout-based escalation.

A transformer receives process events and in case of missing an event execute 
an action (emit message) after given escalation timeout (several stages). The 
initial message may already arrive with reference timestamp in the past and may 
trigger different action upon arrival depending on how far in the past it is.

If the timeout should be computed against some further time only, Punctuator is 
perfectly sufficient. The problem is that I have to evaluate the current 
time-related state once the message arrives.

I am using wall-clock time. Normally accessing System.currentTimeMillis() is 
sufficient, but it breaks in unit testing with TopologyTestDriver, where the 
app wall clock time is different from the system-wide one.

To access the mentioned clock I am using reflection to access 
ProcessorContextImpl#task and then StreamTask#time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to