ableegoldman commented on a change in pull request #9997:
URL: https://github.com/apache/kafka/pull/9997#discussion_r567173261



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -714,19 +720,34 @@ public boolean process(final long wallClockTime) {
             if (recordInfo.queue().size() == maxBufferedSize) {
                 mainConsumer.resume(singleton(partition));
             }
-        } catch (final StreamsException e) {
-            throw e;
+
+            record = null;
+        } catch (final TimeoutException timeoutException) {
+            if (!eosEnabled) {
+                throw timeoutException;
+            } else {
+                record = null;
+                throw new TaskCorruptedException(Collections.singletonMap(id, 
changelogPartitions()));

Review comment:
       I agree, we shouldn't trigger `task.timeout.ms` until it's back to 
RUNNING right?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -111,13 +112,16 @@ public void initialize() {
             final List<PartitionInfo> partitions;
             try {
                 partitions = streamsProducer.partitionsFor(topic);
-            } catch (final KafkaException e) {
-                // TODO: KIP-572 need to handle `TimeoutException`
-                // -> should we throw a `TaskCorruptedException` for this case 
to reset the task and retry (including triggering `task.timeout.ms`) ?
+            } catch (final TimeoutException timeoutException) {
+                log.debug("Could not get partitions for topic {}, will retry", 
topic);

Review comment:
       seems like it should at least be info, if not warn

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -1109,22 +1109,32 @@ int process(final int maxNumRecords, final Time time) {
 
         long now = time.milliseconds();
         for (final Task task : activeTaskIterable()) {
+            int processed = 0;
+            final long then = now;
             try {
-                int processed = 0;
-                final long then = now;
                 while (processed < maxNumRecords && task.process(now)) {
+                    task.clearTaskTimeout();
                     processed++;
                 }
-                now = time.milliseconds();
-                totalProcessed += processed;
-                task.recordProcessBatchTime(now - then);
+            } catch (final TimeoutException timeoutException) {

Review comment:
       When would we hit this TimeoutException vs the one about (in Task)?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to