Wanted to add that there is nothing too special about these utility functions, they are built using a normal consumer.
Eno > On 19 Oct 2016, at 21:59, Eno Thereska <eno.there...@gmail.com> wrote: > > Hi Ali, > > Any chance you could recycle some of the code we have in > streams/src/test/java/.../streams/integration/utils? (I know we don't have it > easily accessible in Maven, for now perhaps you could copy to your directory?) > > For example there is a method there > "IntegrationTestUtils.waitUntilMinValuesRecordsReceived" that could help. > E.g., it is used in almost all our integration tests. One caveat: this method > checks if values have been received in a topic, not Cassandra, so your > streams test might have to write to a dummy output topic, as well as to > Cassandra. > > Let me know what you think, > Eno > > >> On 19 Oct 2016, at 21:37, Ali Akhtar <ali.rac...@gmail.com> wrote: >> >> I'm using Kafka Streams, and I'm attempting to write integration tests for >> a stream processor. >> >> The processor listens to a topic, processes incoming messages, and writes >> some data to Cassandra tables. >> >> I'm attempting to write a test which produces some test data, and then >> checks whether or not the expected data was written to Cassandra. >> >> It looks like this: >> >> - Step 1: Produce data in the test >> - Step 2: Kafka stream gets triggered >> - Step 3: Test checks whether cassandra got populated >> >> The problem is, Step 3 is occurring before Step 2, and as a result, the >> test fails as it doesn't find the data in the table. >> >> I've resolved this by adding a Thread.sleep(2000) call after Step 1, which >> ensures that Step 2 gets triggered before Step 3. >> >> However, I'm wondering if there's a more reliable way of blocking the test >> until Kafka stream processor gets triggered? >> >> At the moment, I'm using 1 thread for the processor. If I increase that to >> 2 threads, will that achieve what I want? >