guozhangwang commented on a change in pull request #9267:
URL: https://github.com/apache/kafka/pull/9267#discussion_r485132348



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -706,13 +662,17 @@ void runOnce() {
                     totalProcessed += processed;
                 }
 
+                log.debug("TaskManager#process handled {} records; invoking 
TaskManager#punctuate", processed);

Review comment:
       Nit: I'd suggest we do not expose internal class names in log entries, 
e.g. here we can say "Processed {} records with {} iterations, invoking 
punctuation now", ditto below.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -689,6 +644,7 @@ void runOnce() {
              *  6. Otherwise, increment N.
              */
             do {
+                log.debug("Invoking TaskManager#process with {} iterations.", 
numIterations);

Review comment:
       What's the rationale of recording both the starting and the ending of a 
procedure? If it is for trouble shooting purposes only maybe the starting log 
entry can be trace while ending entry is debug?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", 
stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it 
multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is 
{}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");
+        // we can always let changelog reader try restoring in order to 
initialize the changelogs;
+        // if there's no active restoring or standby updating it would not try 
to fetch any data
+        changelogReader.restore();
+    }
+
+    private long pollPhase() {
+        final ConsumerRecords<byte[], byte[]> records;
+        log.debug("Invoking Consumer#poll");
+
+        if (state == State.PARTITIONS_ASSIGNED) {
+            // try to fetch some records with zero poll millis
+            // to unblock the restoration as soon as possible
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.PARTITIONS_REVOKED) {
+            // try to fetch som records with zero poll millis to unblock
+            // other useful work while waiting for the join response
+            records = pollRequests(Duration.ZERO);
+        } else if (state == State.RUNNING || state == State.STARTING) {
+            // try to fetch some records with normal poll time
+            // in order to get long polling
+            records = pollRequests(pollTime);
+        } else if (state == State.PENDING_SHUTDOWN) {
+            // we are only here because there's rebalance in progress,
+            // just poll with zero to complete it
+            records = pollRequests(Duration.ZERO);
+        } else {
+            // any other state should not happen
+            log.error("Unexpected state {} during normal iteration", state);
+            throw new StreamsException(logPrefix + "Unexpected state " + state 
+ " during normal iteration");
+        }
+
+        final long pollLatency = advanceNowAndComputeLatency();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Consumer#poll completed in {} ms and fetched {} 
records", pollLatency, records.count());
+        }
+        pollSensor.record(pollLatency, now);
+
+        if (!records.isEmpty()) {

Review comment:
       SG.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -752,6 +712,77 @@ void runOnce() {
         commitRatioSensor.record((double) totalCommitLatency / runOnceLatency, 
now);
     }
 
+    private void initializeAndRestorePhase() {
+        {
+            // only try to initialize the assigned tasks
+            // if the state is still in PARTITION_ASSIGNED after the poll call
+            final State stateSnapshot = state;
+            if (stateSnapshot == State.PARTITIONS_ASSIGNED
+                || stateSnapshot == State.RUNNING && 
taskManager.needsInitializationOrRestoration()) {
+
+                log.debug("State is {}; initializing and restoring", 
stateSnapshot);
+
+                // transit to restore active is idempotent so we can call it 
multiple times
+                changelogReader.enforceRestoreActive();
+
+                if (taskManager.tryToCompleteRestoration()) {
+                    changelogReader.transitToUpdateStandby();
+
+                    setState(State.RUNNING);
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Initialization and restore call done. State is 
{}", state);
+                }
+            }
+        }
+
+        log.debug("Invoking ChangeLogReader#restore");

Review comment:
       Not sure what's the purpose of this log entry?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to