guozhangwang commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498531149
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -692,8 +690,7 @@ public boolean process(final long wallClockTime) {
processorContext.timestamp(),
processorContext.headers()
);
- final Optional<RecordMetadata> recordMetadata =
Optional.ofNullable(processorContext.recordContext());
- maybeMeasureLatency(() -> currNode.process(toProcess,
recordMetadata), time, processLatencySensor);
+ maybeMeasureLatency(() -> currNode.process(toProcess), time,
processLatencySensor);
Review comment:
I think this is a proof of code simplicity that recordContext is
unnecessarily passing around here :)
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -52,6 +52,16 @@
*/
TaskId taskId();
+ /**
+ * The metadata of the record, if it is defined. Note that as long as the
processor is
+ * receiving a record downstream of a Source (i.e., the current record is
coming from an
Review comment:
nit: maybe also state that the metadata would be the one from the source
record?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]