vvcephei commented on a change in pull request #9361:
URL: https://github.com/apache/kafka/pull/9361#discussion_r498400505



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -49,7 +51,28 @@
      * Initializes this state store.
      * <p>
      * The implementation of this function must register the root store in the 
context via the
-     * {@link ProcessorContext#register(StateStore, StateRestoreCallback)} 
function, where the
+     * {@link 
org.apache.kafka.streams.processor.ProcessorContext#register(StateStore, 
StateRestoreCallback)} function,
+     * where the first {@link StateStore} parameter should always be the 
passed-in {@code root} object, and
+     * the second parameter should be an object of user's implementation
+     * of the {@link StateRestoreCallback} interface used for restoring the 
state store from the changelog.
+     * <p>
+     * Note that if the state store engine itself supports bulk writes, users 
can implement another
+     * interface {@link BatchingStateRestoreCallback} which extends {@link 
StateRestoreCallback} to
+     * let users implement bulk-load restoration logic instead of restoring 
one record at a time.
+     * <p>
+     * This method is not called if {@link StateStore#init(StateStoreContext, 
StateStore)}
+     * is implemented.
+     *
+     * @throws IllegalStateException If store gets registered after 
initialized is already finished
+     * @throws StreamsException if the store's change log does not contain the 
partition
+     */
+    void init(org.apache.kafka.streams.processor.ProcessorContext context, 
StateStore root);

Review comment:
       Not sure what's up with this diff, but this is the old API.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##########
@@ -88,9 +88,12 @@ void register(final StateStore store,
      * Get the state store given the store name.
      *
      * @param name The store name
+     * @param <S> The type or interface of the store to return
      * @return The state store instance
+     *
+     * @throws ClassCastException if the return type isn't a type or interface 
of the actual returned store.
      */
-    StateStore getStateStore(final String name);
+    <S extends StateStore> S getStateStore(final String name);

Review comment:
       now that we can implement both the new and old contexts with the same 
Impl, we need this to resolve a clash. It's backward compatible and a nice 
quality-of-life improvement anyway.

##########
File path: 
streams/examples/src/test/java/org/apache/kafka/streams/examples/docs/DeveloperGuideTesting.java
##########
@@ -160,24 +161,24 @@ public void shouldPunctuateIfWallClockTimeAdvances() {
         @Override
         public void init(final ProcessorContext<String, Long> context) {
             this.context = context;
-            context.schedule(Duration.ofSeconds(60), 
PunctuationType.WALL_CLOCK_TIME, time -> flushStore());
-            context.schedule(Duration.ofSeconds(10), 
PunctuationType.STREAM_TIME, time -> flushStore());
+            context.schedule(Duration.ofSeconds(60), 
PunctuationType.WALL_CLOCK_TIME, this::flushStore);
+            context.schedule(Duration.ofSeconds(10), 
PunctuationType.STREAM_TIME, this::flushStore);
             store = context.getStateStore("aggStore");
         }
 
         @Override
-        public void process(final String key, final Long value) {
-            final Long oldValue = store.get(key);
-            if (oldValue == null || value > oldValue) {
-                store.put(key, value);
+        public void process(final Record<String, Long> record) {
+            final Long oldValue = store.get(record.key());
+            if (oldValue == null || record.value() > oldValue) {
+                store.put(record.key(), record.value());
             }
         }
 
-        private void flushStore() {
+        private void flushStore(final long timestamp) {
             final KeyValueIterator<String, Long> it = store.all();
             while (it.hasNext()) {
                 final KeyValue<String, Long> next = it.next();
-                context.forward(next.key, next.value);
+                context.forward(new Record<>(next.key, next.value, timestamp));

Review comment:
       Since we have to define a timestamp now, I'm showing the use of the 
punctuation time in the dev guide.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -83,30 +90,21 @@
      */
     StreamsMetrics metrics();
 
-    /**
-     * Registers and possibly restores the specified storage engine.
-     *
-     * @param store the storage engine
-     * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
-     *
-     * @throws IllegalStateException If store gets registered after 
initialized is already finished
-     * @throws StreamsException if the store's change log does not contain the 
partition
-     */
-    void register(final StateStore store,
-                  final StateRestoreCallback stateRestoreCallback);
-

Review comment:
       Moved to the StateStoreContext.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java
##########
@@ -61,7 +84,9 @@
      * @throws IllegalStateException If store gets registered after 
initialized is already finished
      * @throws StreamsException if the store's change log does not contain the 
partition
      */
-    void init(ProcessorContext context, StateStore root);
+    default void init(final StateStoreContext context, final StateStore root) {
+        init(StoreToProcessorContextAdapter.adapt(context), root);
+    }

Review comment:
       This is the new API. Note the default implementation that delegates to 
the old API.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V 
value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Review comment:
       Migrated to the new Record argument.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContext.java
##########
@@ -29,7 +30,7 @@
 import static java.util.Objects.requireNonNull;
 import static org.apache.kafka.common.utils.Utils.getNullableSizePrefixedArray;
 
-public class ProcessorRecordContext implements RecordContext {
+public class ProcessorRecordContext implements RecordContext, RecordMetadata {

Review comment:
       Here's the implementation of RecordMetadata.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##########
@@ -49,26 +50,38 @@ protected StateManager stateManager() {
         return stateManager;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {
         final StateStore store = stateManager.getGlobalStore(name);
-        return getReadWriteStore(store);
+        return (S) getReadWriteStore(store);
     }
 
     @SuppressWarnings("unchecked")
     @Override
-    public <KIn, VIn> void forward(final KIn key, final VIn value) {
+    public <K, V> void forward(final Record<K, V> record) {
         final ProcessorNode<?, ?, ?, ?> previousNode = currentNode();
         try {
             for (final ProcessorNode<?, ?, ?, ?> child : 
currentNode().children()) {
                 setCurrentNode(child);
-                ((ProcessorNode<KIn, VIn, ?, ?>) child).process(key, value);
+                ((ProcessorNode<K, V, ?, ?>) child).process(record);
             }
         } finally {
             setCurrentNode(previousNode);
         }
     }
 
+    @Override
+    public <K, V> void forward(final Record<K, V> record, final String 
childName) {
+        throw new UnsupportedOperationException("this should not happen: 
forward() not supported in global processor context.");

Review comment:
       Just implementing the new APIs while preserving the existing patterns.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1230,10 +1232,10 @@ public void 
shouldNotShareHeadersBetweenPunctuateIterations() {
         task.completeRestoration();
 
         task.punctuate(processorSystemTime, 1, 
PunctuationType.WALL_CLOCK_TIME, timestamp -> {
-            task.processorContext().recordContext().headers().add("dummy", 
(byte[]) null);
+            task.processorContext().headers().add("dummy", (byte[]) null);

Review comment:
       This test was actually testing a slightly wrong thing: `recordContext` 
was never exposed to users, they would have accessed the headers as 
`processorContext.header()`. It's important here because I've refactored the 
internal code to set `recordContext` to `null` when there is no defined context 
(such as in a punctuation like here).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
##########
@@ -57,11 +59,11 @@ public void setUp() {
     private void init() {
         EasyMock.expect(context.taskId()).andReturn(taskId);
         EasyMock.expect(context.recordCollector()).andReturn(collector);
-        inner.init(context, store);
+        inner.init((ProcessorContext) context, store);

Review comment:
       Just a quick note. We do still expect the inner store to have the old 
init method invoked because none of the wrapper stores are implementing the new 
init method, so they're using the default implementation that delegates to the 
old init method. I'm going to take care of that in a follow-on PR.

##########
File path: streams/src/test/java/org/apache/kafka/test/MockApiProcessor.java
##########
@@ -65,25 +66,19 @@ public void init(final ProcessorContext<KOut, VOut> 
context) {
             scheduleCancellable = context.schedule(
                 Duration.ofMillis(scheduleInterval),
                 punctuationType,
-                timestamp -> {
-                    if (punctuationType == PunctuationType.STREAM_TIME) {
-                        assertThat(context.timestamp(), is(timestamp));
-                    }
-                    assertThat(context.partition(), is(-1));
-                    assertThat(context.offset(), is(-1L));
-
-                    (punctuationType == PunctuationType.STREAM_TIME ? 
punctuatedStreamTime : punctuatedSystemTime)
-                        .add(timestamp);
-                });
+                (punctuationType == PunctuationType.STREAM_TIME ? 
punctuatedStreamTime : punctuatedSystemTime)::add

Review comment:
       This can become a method reference now because those assertions on 
partition and offset are meaningless now.

##########
File path: 
streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
##########
@@ -148,16 +149,16 @@ public TopologyTestDriverTest(final boolean eosEnabled) {
         }
     }
 
-    private final static class Record {
+    private final static class TTDTestRecord {

Review comment:
       This was a bit funny to run into by this point in the implementation. It 
turns out we already had a class called "Record", and now we need to reference 
both of them in this test. I felt like it was more readable to just give this 
class a new name instead of referencing it by fully qualified name.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Processor.java
##########
@@ -46,12 +46,11 @@
     default void init(final ProcessorContext<KOut, VOut> context) {}
 
     /**
-     * Process the record with the given key and value.
+     * Process the record. Note that record metadata is undefined in cases 
such as a forward call from a punctuator.
      *
-     * @param key the key for the record
-     * @param value the value for the record
+     * @param record the record to process
      */
-    void process(KIn key, VIn value);
+    void process(Record<KIn, VIn> record);

Review comment:
       The new Processor API (with Record) proposed in the KIP.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/StateStoreContext.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.File;
+import java.util.Map;
+
+/**
+ * State store context interface.
+ */
+public interface StateStoreContext {

Review comment:
       The new state store context proposed in the KIP.

##########
File path: checkstyle/suppressions.xml
##########
@@ -185,7 +185,7 @@
 
     <!-- Streams tests -->
     <suppress checks="ClassFanOutComplexity"
-              files="StreamThreadTest.java"/>
+              
files="(StreamThreadTest|StreamTaskTest|TopologyTestDriverTest).java"/>

Review comment:
       These tests now have a couple more imports, which pushed them over the 
line. It's possible that they'll drop below the line again after completing the 
transition to KIP-478. Otherwise, we should refactor these tests to comply with 
the limit. But we should do that separately.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -55,6 +52,16 @@
      */
     TaskId taskId();
 
+    /**
+     * The metadata of the record, if it is defined. Note that as long as the 
processor is
+     * receiving a record downstream of a Source (i.e., the current record is 
coming from an
+     * input topic), the metadata is defined. On the other hand, if a parent 
processor has
+     * registered a punctuator and called {@link 
ProcessorContext#forward(Record)} from that
+     * punctuator, then there is no record from an input topic, and therefore 
the metadata
+     * would be undefined.
+     */
+    Optional<RecordMetadata> recordMetadata();

Review comment:
       The new RecordMetadata context proposed in the KIP. Hopefully, the 
Javadoc is clear on why it's Optional.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link 
Processor}
+ * or a record to forward to downstream processors via {@link 
ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and 
value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of 
this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause 
subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, 
{@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final 
Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. 
Got: " + timestamp)
+            );
+        }
+        this.timestamp = timestamp;
+        this.headers = new RecordHeaders(headers);

Review comment:
       Note: each time we create a new Record, we copy the headers. This is an 
improvement over the current situation where there's no mutability barriers 
across the whole subtopology, so changes to headers in one processor can have 
unexpected effects on other processors that are very far away in the dependency 
diagram.
   
   However, it doesn't completely solve the problem: changes in children can 
still be visible to parents and siblings. @mjsax and I discussed an alternative 
option of providing a completely immutable implementation (copy on write) of 
Headers as a complete solution. But it also seems to be a pretty severe 
performance penalty. Instead, perhaps we can just document a safe pattern. E.g.,
   ```
   record = new Record(...)
   context.forward(record, "childA")
   record.headers().add(new header)
   // or
   record.withHeaders(record.headers().add(new header))
   context.forward(record, "childB")
   ```
   is unsafe because childA may modify the headers, affecting both the parent 
and childB. Instead, you should do something like:
   
   ```
   record1 = new Record(...)
   record2 = new Record(...)
   record2.headers().add(new header)
   context.forward(record1, "childA")
   context.forward(record2, "childB")
   ```
   Now, the headers for both children are completely independent objects.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V 
value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V 
value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, 
final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the offset
-     */
-    long offset();
-
-    /**
-     * Returns the headers of the current input record; could be null if it is 
not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the headers
-     */
-    Headers headers();
-
-    /**
-     * Returns the current timestamp.
-     *
-     * <p> If it is triggered while processing a record streamed from the 
source processor,
-     * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
-     * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
-     *
-     * <p> If it is triggered while processing a record generated not from the 
source processor (for example,
-     * if this method is invoked from the punctuate call), timestamp is 
defined as the current
-     * task's stream time, which is defined as the largest timestamp of any 
record processed by the task.
-     *
-     * @return the timestamp
-     */
-    long timestamp();

Review comment:
       These are moved to Record.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##########
@@ -147,7 +148,7 @@ public void setGlobalProcessorContext(final 
InternalProcessorContext globalProce
             globalStoreNames.add(stateStore.name());
             final String sourceTopic = 
storeToChangelogTopic.get(stateStore.name());
             changelogTopics.add(sourceTopic);
-            stateStore.init(globalProcessorContext, stateStore);
+            stateStore.init((StateStoreContext) globalProcessorContext, 
stateStore);

Review comment:
       These are scattered throughout this PR. It's just selecting the init 
method we want to invoke. It's only necessary because the 
`globalProcessorContext` here actually implements both `ProcessorContext` and 
`StateStoreContext`. This is only true of our internal contexts, so users will 
not face a similar need to change code.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextReverseAdapter.java
##########
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextReverseAdapter implements 
InternalProcessorContext {

Review comment:
       Don't need this adapter anymore, either.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##########
@@ -140,76 +138,25 @@ Cancellable schedule(final Duration interval,
                          final Punctuator callback);
 
     /**
-     * Forwards a key/value pair to all downstream processors.
-     * Used the input record's timestamp as timestamp for the output record.
+     * Forwards a record to all child processors.
      *
-     * @param key key
-     * @param value value
+     * @param record The record to forward to all children
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V 
value);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record);
 
     /**
-     * Forwards a key/value pair to the specified downstream processors.
-     * Can be used to set the timestamp of the output record.
+     * Forwards a record to the specified child processor.
      *
-     * @param key key
-     * @param value value
-     * @param to the options to use when forwarding
+     * @param record The record to forward
+     * @param childName The name of the child processor to receive the record
      */
-    <K extends KForward, V extends VForward> void forward(final K key, final V 
value, final To to);
+    <K extends KForward, V extends VForward> void forward(Record<K, V> record, 
final String childName);
 
     /**
      * Requests a commit.
      */
     void commit();
 
-    /**
-     * Returns the topic name of the current input record; could be null if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the topic name
-     */
-    String topic();
-
-    /**
-     * Returns the partition id of the current input record; could be -1 if it 
is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the partition id
-     */
-    int partition();
-
-    /**
-     * Returns the offset of the current input record; could be -1 if it is not
-     * available (for example, if this method is invoked from the punctuate 
call).
-     *
-     * @return the offset
-     */
-    long offset();

Review comment:
       These are moved to RecordMetadata

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
##########
@@ -114,7 +117,7 @@ public void init(final InternalProcessorContext context) {
             maybeMeasureLatency(
                 () -> {
                     if (processor != null) {
-                        processor.init(ProcessorContextAdapter.adapt(context));
+                        processor.init((ProcessorContext<KOut, VOut>) context);

Review comment:
       Casting to add the generic params (the InternalProcessorContext is 
parameterized as `<Object, Object>`).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
##########
@@ -104,7 +105,13 @@ public void update(final ConsumerRecord<byte[], byte[]> 
record) {
                     deserialized.headers());
             processorContext.setRecordContext(recordContext);
             
processorContext.setCurrentNode(sourceNodeAndDeserializer.sourceNode());
-            ((SourceNode<Object, Object, Object, Object>) 
sourceNodeAndDeserializer.sourceNode()).process(deserialized.key(), 
deserialized.value());
+            final Record<Object, Object> toProcess = new Record<>(
+                deserialized.key(),
+                deserialized.value(),
+                processorContext.timestamp(),
+                processorContext.headers()
+            );
+            ((SourceNode<Object, Object, Object, Object>) 
sourceNodeAndDeserializer.sourceNode()).process(toProcess);

Review comment:
       This is a pretty common pattern where we need to bridge the new and old 
APIs. We construct the "record" by filling in the timestamp and headers from 
the context.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -866,9 +867,9 @@ public void init(final ProcessorContext<String, String> 
context) {
                     }
 
                     @Override
-                    public void process(final String key, final String value) {
-                        if (value.length() % 2 == 0) {
-                            context.forward(key, key + value);
+                    public void process(final Record<String, String> record) {
+                        if (record.value().length() % 2 == 0) {
+                            context.forward(record.withValue(record.key() + 
record.value()));

Review comment:
       A good example of updating just the value for the child.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
##########
@@ -33,7 +34,9 @@
  * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
  * {@link ThreadCache}
  */
-public interface InternalProcessorContext extends ProcessorContext {
+public interface InternalProcessorContext
+    extends ProcessorContext, 
org.apache.kafka.streams.processor.api.ProcessorContext<Object, Object>, 
StateStoreContext {

Review comment:
       Thanks to this, all our internal Impls are suitable to pass in to any of 
the new APIs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalApiProcessorContext.java
##########
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.RecordContext;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.Task.TaskType;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-import 
org.apache.kafka.streams.state.internals.ThreadCache.DirtyEntryFlushListener;
-
-/**
- * For internal use so we can update the {@link RecordContext} and current
- * {@link ProcessorNode} when we are forwarding items that have been evicted 
or flushed from
- * {@link ThreadCache}
- */
-public interface InternalApiProcessorContext<KForward, VForward> extends 
ProcessorContext<KForward, VForward> {

Review comment:
       Don't need this anymore, since the InternalProcessorContext can now 
implement all of the new and old Contexts.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -742,8 +760,7 @@ public void punctuate(final ProcessorNode<?, ?, ?, ?> node,
             throw new IllegalStateException(String.format("%sCurrent node is 
not null", logPrefix));
         }
 
-        updateProcessorContext(new StampedRecord(new 
ConsumerRecord<>(ProcessorContextImpl.NONEXIST_TOPIC, -1, -1L, null, null),
-            timestamp), node, time.milliseconds());
+        updateProcessorContext(node, time.milliseconds(), null);

Review comment:
       Instead of setting a dummy context, we're now just setting the context 
to `null` aka "undefined".

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link 
Processor}
+ * or a record to forward to downstream processors via {@link 
ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and 
value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of 
this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {

Review comment:
       The new Record class proposed in the KIP.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreToProcessorContextAdapter.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+public final class StoreToProcessorContextAdapter implements ProcessorContext {
+    private final StateStoreContext delegate;
+
+    public static ProcessorContext adapt(final StateStoreContext delegate) {
+        if (delegate instanceof ProcessorContext) {
+            return (ProcessorContext) delegate;
+        } else {
+            return new StoreToProcessorContextAdapter(delegate);
+        }
+    }

Review comment:
       This allows us to transparently delegate the new API to the old one for 
StateStore implementations. Our internal StateStoreContext implementations all 
implement both APIs, so they just get casted, while if you use a separate 
implementation of StateStoreContext (e.g., in unit tests), it'll get adapted, 
which works just as long as the underlying store implementation doesn't try to 
call forward or anything.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ForwardingDisabledProcessorContext.java
##########
@@ -86,7 +86,7 @@ public void register(final StateStore store,
     }
 
     @Override
-    public StateStore getStateStore(final String name) {
+    public <S extends StateStore> S getStateStore(final String name) {

Review comment:
       Just implementing the interface.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
##########
@@ -112,64 +114,69 @@ public void register(final StateStore store,
         stateManager().registerStore(store, stateRestoreCallback);
     }
 
-    /**
-     * @throws IllegalStateException if the task's record is null
-     */
     @Override
     public String topic() {
         if (recordContext == null) {
-            throw new IllegalStateException("This should not happen as topic() 
should only be called while a record is processed");
-        }
-
-        final String topic = recordContext.topic();
-
-        if (NONEXIST_TOPIC.equals(topic)) {
+            // This is only exposed via the deprecated ProcessorContext,
+            // in which case, we're preserving the pre-existing behavior
+            // of returning dummy values when the record context is undefined.
+            // For topic, the dummy value is `null`.

Review comment:
       The prior code was a bit misleading. I could not find _any_ code path 
where the context was actually null before, since we always initialized it with 
a "dummy context". This change simplifies the codebase by just moving the dummy 
values here and we now really do set the record context to `null` to 
(internally) signify when it is undefined.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextAdapter.java
##########
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.processor.internals;
-
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.processor.Cancellable;
-import org.apache.kafka.streams.processor.PunctuationType;
-import org.apache.kafka.streams.processor.Punctuator;
-import org.apache.kafka.streams.processor.StateRestoreCallback;
-import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.To;
-import org.apache.kafka.streams.processor.api.ProcessorContext;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.internals.ThreadCache;
-
-import java.io.File;
-import java.time.Duration;
-import java.util.Map;
-
-public final class ProcessorContextAdapter<KForward, VForward>
-    implements ProcessorContext<KForward, VForward>, 
InternalApiProcessorContext<KForward, VForward> {

Review comment:
       We also don't need these adapters anymore.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ToInternal.java
##########
@@ -23,6 +23,10 @@ public ToInternal() {
         super(To.all());
     }
 
+    public ToInternal(final To to) {
+        super(to);
+    }

Review comment:
       A copy constructor helped with the ProcessorContextImpl refactoring.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
##########
@@ -60,7 +61,7 @@ public void init(final ProcessorContext context) {
                     context.schedule(
                         Duration.ofMillis(1),
                         PunctuationType.WALL_CLOCK_TIME,
-                        timestamp -> context.forward(-1, (int) timestamp)
+                        timestamp -> context.forward(-1, (int) timestamp, 
To.all().withTimestamp(timestamp))

Review comment:
       The prior code here was actually relying on a strange effect in which we 
set the (undefined) processor context's timestamp to the punctuation time. I 
could preserve that behavior, but it looked like a bug to me.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/testutil/ConsumerRecordUtil.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.testutil;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.record.TimestampType;
+
+public final class ConsumerRecordUtil {
+    private ConsumerRecordUtil() {}
+
+    public static <K, V> ConsumerRecord<K, V> record(final String topic,
+                                                     final int partition,
+                                                     final long offset,
+                                                     final K key,
+                                                     final V value) {
+        // the no-time constructor in ConsumerRecord initializes the
+        // timestamp to -1, which is an invalid configuration. Here,
+        // we initialize it to 0.
+        return new ConsumerRecord<>(
+            topic,
+            partition,
+            offset,
+            0L,
+            TimestampType.CREATE_TIME,
+            0L,
+            0,
+            0,
+            key,
+            value
+        );
+    }

Review comment:
       This is the utility method I replaced the ConsumerRecord constructor 
with earlier in the PR.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -775,8 +775,8 @@ public void init(final ProcessorContext<String, String> 
context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value);
+        public void process(final Record<String, String> record) {
+            context.forward(record);

Review comment:
       just a simple passthrough

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/RecordMetadata.java
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+public interface RecordMetadata {

Review comment:
       The new metadata proposed in the KIP. Note that it's an interface 
because in reality, it's just going to be a view onto the 
ProcessorRecordContext.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java
##########
@@ -127,7 +128,7 @@ public void shouldInitializeProcessorTopology() {
     @Test
     public void shouldProcessRecordsForTopic() {
         globalStateTask.initialize();
-        globalStateTask.update(new ConsumerRecord<>(topic1, 1, 1, 
"foo".getBytes(), "bar".getBytes()));
+        globalStateTask.update(record(topic1, 1, 1, "foo".getBytes(), 
"bar".getBytes()));

Review comment:
       That constructor of ConsumerRecord set the timestamp to `-1`, which is 
now prohibited because we construct a Record before processing, and Record 
enforces no negative timestamps.
   
   This seems fine to me, since it would only happen in unit tests (as 
ConsumerRecords returned from the broker never have negative timestamps).

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
##########
@@ -792,8 +792,8 @@ public void init(final ProcessorContext<String, String> 
context) {
         }
 
         @Override
-        public void process(final String key, final String value) {
-            context.forward(key, value, 
To.all().withTimestamp(context.timestamp() + 10));
+        public void process(final Record<String, String> record) {
+            context.forward(record.withTimestamp(record.timestamp() + 10));

Review comment:
       An example of setting only the timestamp.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStoreTest.java
##########
@@ -55,7 +56,7 @@ public void before() {
             collector,
             new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics())));
         context.setTime(0);
-        store.init(context, store);
+        store.init((StateStoreContext) context, store);

Review comment:
       Words cannot express how tired of this change I was by this point.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -466,10 +467,11 @@ public void init(final InternalProcessorContext context) {
                 this.context = context;
                 super.init(context);
             }
+
             @Override
-            public void process(final Integer key, final Integer value) {
-                if (key % 2 == 0) {
-                    context.forward(key, value);
+            public void process(final Record<Integer, Integer> record) {
+                if (record.key() % 2 == 0) {
+                    context.forward(record);

Review comment:
       Example of filtering but otherwise passing the record through.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
##########
@@ -81,14 +84,9 @@ public void 
shouldThrowNullPointerOnRegisterIfStateStoreIsNull() {
     }
 
     @Test
-    public void shouldThrowIllegalStateExceptionOnTopicIfNoRecordContext() {
+    public void shouldReturnNullTopicIfNoRecordContext() {

Review comment:
       These tests were enforcing a behavior that would never have actually 
happened in practice. Since I changed these methods to return the dummy values 
when the context is undefined, these tests also have to change.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -670,9 +671,26 @@ public boolean process(final long wallClockTime) {
 
             log.trace("Start processing one record [{}]", record);
 
-            updateProcessorContext(record, currNode, wallClockTime);
+            updateProcessorContext(
+                currNode,
+                wallClockTime,
+                new ProcessorRecordContext(
+                    record.timestamp,
+                    record.offset(),
+                    record.partition(),
+                    record.topic(),
+                    record.headers()
+                )
+            );
+
             maybeRecordE2ELatency(record.timestamp, wallClockTime, 
currNode.name());
-            maybeMeasureLatency(() -> currNode.process(record.key(), 
record.value()), time, processLatencySensor);
+            final Record<Object, Object> toProcess = new Record<>(
+                record.key(),
+                record.value(),
+                processorContext.timestamp(),
+                processorContext.headers()

Review comment:
       This is pulling out the timestamp and headers that we just set a few 
lines earlier.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -967,11 +968,11 @@ public void init(final ProcessorContext<String, String> 
context) {
                 }
 
                 @Override
-                public void process(final String key, final String value) {
+                public void process(final Record<String, String> record) {
                     final KeyValueStore<String, Long> kvStore = 
context.getStateStore(storeName);
-                    kvStore.put(key, 5L);
+                    kvStore.put(record.key(), 5L);
 
-                    context.forward(key, "5");
+                    context.forward(record.withValue("5"));

Review comment:
       You're going to see a lot of these in the tests.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/Record.java
##########
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.streams.errors.StreamsException;
+
+/**
+ * A data class representing an incoming record for processing in a {@link 
Processor}
+ * or a record to forward to downstream processors via {@link 
ProcessorContext}.
+ *
+ * This class encapsulates all the data attributes of a record: the key and 
value, but
+ * also the timestamp of the record and any record headers.
+ *
+ * This class is immutable, though the objects referenced in the attributes of 
this class
+ * may themselves be mutable.
+ *
+ * @param <K> The type of the key
+ * @param <V> The type of the value
+ */
+public class Record<K, V> {
+    private final K key;
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    /**
+     * The full constructor, specifying all the attributes of the record.
+     *
+     * @param key The key of the record. May be null.
+     * @param value The value of the record. May be null.
+     * @param timestamp The timestamp of the record. May not be negative.
+     * @param headers The headers of the record. May be null, which will cause 
subsequent calls
+     *                to {@link this#headers()} to return a non-null, empty, 
{@link Headers} collection.
+     *
+     * @throws IllegalArgumentException if the timestamp is negative.
+     */
+    public Record(final K key, final V value, final long timestamp, final 
Headers headers) {
+        this.key = key;
+        this.value = value;
+        if (timestamp < 0) {
+            throw new StreamsException(
+                "Malformed Record",
+                new IllegalArgumentException("Timestamp may not be negative. 
Got: " + timestamp)
+            );
+        }

Review comment:
       We used to check this only in SinkNode, but it seems better to fail fast 
since we actually have the opportunity to do so now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to