guozhangwang commented on code in PR #12204:
URL: https://github.com/apache/kafka/pull/12204#discussion_r906425650


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -148,55 +173,184 @@ public void process(final Record<KIn, VIn> record) {
                 }
             }
 
-            if (mergedWindow.end() < closeTime) {
-                if (context().recordMetadata().isPresent()) {
-                    final RecordMetadata recordMetadata = 
context().recordMetadata().get();
-                    LOG.warn(
-                        "Skipping record for expired window. " +
-                            "topic=[{}] " +
-                            "partition=[{}] " +
-                            "offset=[{}] " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        recordMetadata.topic(), recordMetadata.partition(), 
recordMetadata.offset(),
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                } else {
-                    LOG.warn(
-                        "Skipping record for expired window. Topic, partition, 
and offset not known. " +
-                            "timestamp=[{}] " +
-                            "window=[{},{}] " +
-                            "expiration=[{}] " +
-                            "streamTime=[{}]",
-                        timestamp,
-                        mergedWindow.start(), mergedWindow.end(),
-                        closeTime,
-                        observedStreamTime
-                    );
-                }
-                droppedRecordsSensor.record();
+            if (mergedWindow.end() < windowCloseTime) {
+                logSkippedRecordForExpiredWindow(timestamp, windowCloseTime, 
mergedWindow);
             } else {
                 if (!mergedWindow.equals(newSessionWindow)) {
                     for (final KeyValue<Windowed<KIn>, VAgg> session : merged) 
{
                         store.remove(session.key);
+
+                        maybeForwardUpdate(session.key, session.value, null, 
record.timestamp());
+                        /*
                         tupleForwarder.maybeForward(
                             record.withKey(session.key)
                                 .withValue(new Change<>(null, session.value)));
+                         */
                     }
                 }
 
                 agg = aggregator.apply(record.key(), record.value(), agg);
                 final Windowed<KIn> sessionKey = new Windowed<>(record.key(), 
mergedWindow);
                 store.put(sessionKey, agg);
+
+                maybeForwardUpdate(sessionKey, null, agg, record.timestamp());
+                /*
                 tupleForwarder.maybeForward(
                     record.withKey(sessionKey)
                         .withValue(new Change<>(agg, null)));
+                 */
             }
+
+            maybeForwardFinalResult(record, windowCloseTime);
+        }
+
+        private void maybeForwardUpdate(final Windowed<KIn> windowedkey,
+                                        final VAgg oldAgg,
+                                        final VAgg newAgg,
+                                        final long oldTimestamp) {
+            if (emitStrategy.type() == 
EmitStrategy.StrategyType.ON_WINDOW_CLOSE) {
+                return;
+            }
+
+            // Update the sent record timestamp to the window end time if 
possible
+            final long newTimestamp = windowedkey.key() != null ? 
windowedkey.window().end() : oldTimestamp;

Review Comment:
   This behavior was meant to inherit from the deleted code: 
https://github.com/apache/kafka/pull/12204/files#diff-85c8c92d464af8eb3a60684bf929725f8fc5263353c38cacc20bee4cefe4fd9eL53,
 but after checking that logic I now realized it's not necessary anymore (the 
original PR has to do so since we cannot programmatically guarantee it's always 
not `null` but in this change we do not have that concern anymore).
   
   Will remove.
   
   



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java:
##########
@@ -83,50 +94,64 @@ private class KStreamSessionWindowAggregateProcessor extends
         ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {
 
         private SessionStore<KIn, VAgg> store;
-        private SessionTupleForwarder<KIn, VAgg> tupleForwarder;
+        private TimestampedTupleForwarder<Windowed<KIn>, VAgg> tupleForwarder;
         private Sensor droppedRecordsSensor;
+        private Sensor emittedRecordsSensor;
+        private Sensor emitFinalLatencySensor;
+        private long lastEmitWindowCloseTime = ConsumerRecord.NO_TIMESTAMP;
         private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+        private InternalProcessorContext<Windowed<KIn>, Change<VAgg>> 
internalProcessorContext;
+
+        private final Time time = Time.SYSTEM;

Review Comment:
   Makes sense. I will do this in a follow-up PR after merging this.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStore.java:
##########
@@ -61,6 +65,18 @@ public <R> QueryResult<R> query(final Query<R> query,
         );
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+                prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        final KeyValueIterator<Bytes, byte[]> bytesIterator = 
wrapped().fetchAll(earliestEndTime, latestEndTime);

Review Comment:
   Sounds good. I would just have a special handling on both lower/upper bound 
as well as the hasNext function for this specific purpose.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java:
##########
@@ -202,25 +205,43 @@ public void remove(final Windowed<Bytes> sessionKey) {
 
     @Override
     public byte[] fetchSession(final Bytes key,
-                               final long earliestSessionEndTime,
-                               final long latestSessionStartTime) {
+                               final long sessionStartTime,
+                               final long sessionEndTime) {
         removeExpiredSegments();
 
         Objects.requireNonNull(key, "key cannot be null");
 
         // Only need to search if the record hasn't expired yet
-        if (latestSessionStartTime > observedStreamTime - retentionPeriod) {
-            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(latestSessionStartTime);
+        if (sessionEndTime > observedStreamTime - retentionPeriod) {
+            final ConcurrentNavigableMap<Bytes, ConcurrentNavigableMap<Long, 
byte[]>> keyMap = endTimeMap.get(sessionEndTime);
             if (keyMap != null) {
                 final ConcurrentNavigableMap<Long, byte[]> startTimeMap = 
keyMap.get(key);
                 if (startTimeMap != null) {
-                    return startTimeMap.get(earliestSessionEndTime);
+                    return startTimeMap.get(sessionStartTime);
                 }
             }
         }
         return null;
     }
 
+    @Override
+    public KeyValueIterator<Windowed<Bytes>, byte[]> findSessions(final 
Instant earliestSessionEndTime,
+                                                                  final 
Instant latestSessionEndTime) {
+        removeExpiredSegments();
+
+        final long earliestEndTime = 
ApiUtils.validateMillisecondInstant(earliestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(earliestSessionEndTime, 
"earliestSessionEndTime"));
+        final long latestEndTime = 
ApiUtils.validateMillisecondInstant(latestSessionEndTime,
+            prepareMillisCheckFailMsgPrefix(latestSessionEndTime, 
"latestSessionEndTime"));
+
+        // since subMap is exclusive on toKey, we need to plus one
+        return registerNewIterator(null,
+                                   null,
+                                    Long.MAX_VALUE,
+                                    endTimeMap.subMap(earliestEndTime, 
latestEndTime + 1).entrySet().iterator(),

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to