ganesh-sadanala commented on PR #16162: URL: https://github.com/apache/kafka/pull/16162#issuecomment-2143705807
I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: https://github.com/apache/kafka/pull/16162 I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. Start ZooKeeper, Kafka Broker. Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo 5. Produce and consume data bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated. When I run the debugger, I see that in this code tasks.activeTasks() is an empty list. Because of that punctuated values is becoming zero, hence the punctuate ratio. TaskExecutor.java ``` int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } ``` Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org