ganesh-sadanala commented on PR #16162:
URL: https://github.com/apache/kafka/pull/16162#issuecomment-2143705807

    I have completed the implementation using the SlidingWindow approach with 
x=30 seconds for testing. Here are the changes: 
https://github.com/apache/kafka/pull/16162
   
   
   
   
   I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
   
   Start ZooKeeper, Kafka Broker.
   Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
   bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1
   
   bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 
   
   4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:
   
   bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo 
   
   5. Produce and consume data
   
   bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input
   
   bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning
   
          6. Open the jconsole and watch the metrics
   
   
   
   
   I see that all the metrics are getting calculated. When I run the debugger, 
I see that in this code tasks.activeTasks() is an empty list. Because of that 
punctuated values is becoming zero, hence the punctuate ratio.
   
   TaskExecutor.java
   
   ```
   int punctuate() {
           int punctuated = 0;
   
           for (final Task task : tasks.activeTasks()) {
               try {
                   if (executionMetadata.canPunctuateTask(task)) {
                       if (task.maybePunctuateStreamTime()) {
                           punctuated++;
                       }
                       if (task.maybePunctuateSystemTime()) {
                           punctuated++;
                       }
                   }
               } catch (final TaskMigratedException e) {
                   log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
                       "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                   throw e;
               } catch (final StreamsException e) {
                   log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                   e.setTaskId(task.id());
                   throw e;
               } catch (final KafkaException e) {
                   log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                   throw new StreamsException(e, task.id());
               }
           }
   
           return punctuated;
       }
   } 
   ```
   
   Is there a way to make active tasks list non-empty, thus I can test the 
changes and write some unit tests?
   
   
   
   
   Is this behaviour normal in the local environment?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to