Searching the codebase I found only one usage of the
`ProcessorTopologyTestDriver` with `getKeyValueStore()` (or
`getStateStore()`) [0], and that usage only gets from store. Perhaps the
JavaDoc suggests something that cannot actually be done?

The obvious workaround is to get data into the store via the topology using
`#process()`, but requires using the topology and is not ideal for testing.

[0]
https://github.com/apache/kafka/blob/5d798511b12c5ef7555e4234fdd99a360176e435/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java#L199

On Mon, Aug 21, 2017 at 12:59 PM, Dmitry Minkovsky <dminkov...@gmail.com>
wrote:

> I am trying to `put()` to a KeyValueStore that I got from
> ProcessorTopologyTestDriver#getKeyValueStore() as part of setup for a
> test. The JavaDoc endorses this use-case:
>
>      * This is often useful in test cases to pre-populate the store before
> the test case instructs the topology to
>      * {@link #process(String, byte[], byte[]) process an input message},
> and/or to check the store afterward.
>
> However, the `put()` results in the following error:
>
> java.lang.IllegalStateException: This should not happen as offset()
> should only be called while a record is processed
>
> at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.
> offset(AbstractProcessorContext.java:139)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> CachingKeyValueStore.java:193)
> at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(
> CachingKeyValueStore.java:188)
> at pony.UserEntityTopologySupplierTest.confirm-settings-requests(
> UserEntityTopologySupplierTest.groovy:81)
>
> This error seems straightforward: I am not doing the `put` within the
> context of stream processing. How do I reconcile this with the fact that I
> am trying to populate the store for a test, which the JavaDoc endorses?
>
> Thank you,
> Dmitry
>

Reply via email to