vvcephei commented on a change in pull request #9368:
URL: https://github.com/apache/kafka/pull/9368#discussion_r504792364



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -482,8 +486,10 @@ boolean tryToCompleteRestoration() {
                 if (restored.containsAll(task.changelogPartitions())) {
                     try {
                         task.completeRestoration();
-                    } catch (final TimeoutException e) {
-                        log.debug("Could not complete restoration for {} due 
to {}; will retry", task.id(), e);
+                        task.clearTaskTimeout();
+                    } catch (final TimeoutException timeoutException) {
+                        task.maybeInitTaskTimeoutOrThrow(now, 
timeoutException);
+                        log.debug("Could not complete restoration for {} due 
to {}; will retry", task.id(), timeoutException);

Review comment:
       ```suggestion
                           log.debug(String.format("Could not complete 
restoration for %s; will retry", task.id()), timeoutException);
   ```
   
   It might be a good idea to add tests for the log messages so we can tell if 
they're actually properly formatted or not. Hopefully, the log4j upgrade makes 
it easier to detect these logging bugs.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -147,9 +147,12 @@ public void update(final Set<TopicPartition> 
topicPartitions, final Map<String,
         topology.updateSourceTopics(nodeToSourceTopics);
     }
 
+    /**
+     * @throws TimeoutException if {@code currentWallClockMs > 
task-timeout-deadline}
+     */
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
-                                     final Logger log) throws StreamsException 
{
+                                     final Exception cause,
+                                     final Logger log) {

Review comment:
       It seems like we ought to just define `log` at the AbstractTask level 
and avoid having two almost identical `maybeInitTaskTimeoutOrThrow` method 
definitions.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##########
@@ -148,7 +148,7 @@ public void update(final Set<TopicPartition> 
topicPartitions, final Map<String,
     }
 
     void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs,
-                                     final TimeoutException timeoutException,
+                                     final Exception cause,

Review comment:
       Sounds good!

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -454,18 +454,22 @@ private void addNewTask(final Task task) {
      * @throws StreamsException if the store's change log does not contain the 
partition
      * @return {@code true} if all tasks are fully restored
      */
-    boolean tryToCompleteRestoration() {
+    boolean tryToCompleteRestoration(final long now) {
         boolean allRunning = true;
 
         final List<Task> activeTasks = new LinkedList<>();
         for (final Task task : tasks.values()) {
             try {
                 task.initializeIfNeeded();
-            } catch (final LockException | TimeoutException e) {
+                task.clearTaskTimeout();
+            } catch (final LockException retriableException) {
                 // it is possible that if there are multiple threads within 
the instance that one thread
                 // trying to grab the task from the other, while the other has 
not released the lock since
                 // it did not participate in the rebalance. In this case we 
can just retry in the next iteration
-                log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), e);
+                log.debug("Could not initialize {} due to the following 
exception; will retry", task.id(), retriableException);

Review comment:
       ```suggestion
                   log.debug(String.format("Could not initialize %s due to the 
following exception; will retry", task.id()), retriableException);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to