mjsax commented on code in PR #12740: URL: https://github.com/apache/kafka/pull/12740#discussion_r1059149907
########## streams/src/test/java/org/apache/kafka/test/MockProcessor.java: ########## @@ -41,16 +44,16 @@ public MockProcessor() { delegate = new MockApiProcessor<>(); } - @SuppressWarnings("unchecked") @Override - public void init(final ProcessorContext context) { - super.init(context); - delegate.init((org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>) context); + public void init(ProcessorContext<KOut, VOut> context) { + Processor.super.init(context); Review Comment: Why adding `Processor.` ? Seems unnecessary. ########## streams/src/test/java/org/apache/kafka/test/MockProcessor.java: ########## @@ -81,14 +84,19 @@ public Cancellable scheduleCancellable() { return delegate.scheduleCancellable(); } - public ArrayList<KeyValueTimestamp<K, V>> processed() { + public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() { return delegate.processed(); } - @SuppressWarnings("unchecked") public void addProcessorMetadata(final String key, final long value) { if (context instanceof InternalProcessorContext) { - ((InternalProcessorContext<K, V>) context).addProcessorMetadataKeyValue(key, value); + ((InternalProcessorContext<KOut, VOut>) context).addProcessorMetadataKeyValue(key, value); } } + + @Override + public void close() { Review Comment: Good catch! ########## streams/src/test/java/org/apache/kafka/test/MockProcessor.java: ########## @@ -81,14 +84,19 @@ public Cancellable scheduleCancellable() { return delegate.scheduleCancellable(); } - public ArrayList<KeyValueTimestamp<K, V>> processed() { + public ArrayList<KeyValueTimestamp<KIn, VIn>> processed() { return delegate.processed(); } - @SuppressWarnings("unchecked") public void addProcessorMetadata(final String key, final long value) { if (context instanceof InternalProcessorContext) { - ((InternalProcessorContext<K, V>) context).addProcessorMetadataKeyValue(key, value); + ((InternalProcessorContext<KOut, VOut>) context).addProcessorMetadataKeyValue(key, value); } } + + @Override + public void close() { + Processor.super.close(); Review Comment: as above ########## streams/src/test/java/org/apache/kafka/test/MockProcessor.java: ########## @@ -28,9 +29,11 @@ import java.util.List; import java.util.Map; -@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated. -public class MockProcessor<K, V> extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> { - private final MockApiProcessor<K, V, Object, Object> delegate; +public class MockProcessor<KIn, VIn, KOut, VOut> implements Processor<KIn, VIn, KOut, VOut> { + private final MockApiProcessor<KIn, VIn, KOut, VOut> delegate; + + private ProcessorContext<KOut, VOut> context; Review Comment: Why do we add this? We inherit `context` from the super class anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org