It loops on for (messageAndMetadata <- stream if messagesRead < config.numMessages) { // count message }
That means it loops over *all* messages in the stream (eventually blocking when all messages are exhausted) but only counts the first N as determined by the config. As a result this command seems to always hang forever. Seems like someone "improved" this without ever trying to run it :-( -Jay