Hi Guozhang, thanks for your reply I can confirm that the init method is quite basic:
public void init(ProcessorContext context) { context.schedule(TIMEOUT.getMillis()); this.context = context; this.store = (KeyValueStore)this.context.getStateStore(storeName); } Omitting try catch and logging. In case it's relevant, this is how the state store is created: StateStoreSupplier storeSupplier = Stores.create(storeName) .withKeys(keyserde) .withValues(valueserde) .persistent() .build(); builder.addStateStore(storeSupplier); I can confirm though that all stack traces we have clearly originate from `put()` being called from `transform()` Thanks Adrian -----Original Message----- From: Guozhang Wang [mailto:wangg...@gmail.com] Sent: 14 June 2017 21:18 To: users@kafka.apache.org Subject: Re: IllegalStateException when putting to state store in Transformer implementation Hello Adrian, When you call "put()" on the windowed state store that does not specify a timestamp, then the `timestamp()` is retrieved to use as the default timestamp. ---------------------- public synchronized void put(final K key, final V value) { put(key, value, context.timestamp()); } ---------------------- The question is when were you calling `put()` in the Transformer, did you ever call it in `init()` function? Guozhang On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague <adrian.mcca...@zopa.com> wrote: > Hi All > > We have a transformer implementation in our Kafka Streams application > that raises this exception, sometimes, when starting. > > "java.lang.IllegalStateException: This should not happen as > timestamp() should only be called while a record is processed" > > This happens when 'put' is called on a state store within the > `transform` method of a custom `Transformer`. > > The full trace can be seen here, apologies for the JSON formatting: > https://pastebin.com/QYKE7bSH > > > * We did not see this when the input and output topic of the topology > had only a single partition. > * We do not see this when the streams thread is handling only a single > partition of data. (ie 4 partitions, 4 consumers in the consumer group) > * We see this when deploying the consumer group and the first > consumers to connect are handling multiple partitions (assumed). > Once all have started and each consumer is processing a single > partition each, the issue appears to go away. > > We are using Kafka Streams Client: 0.10.2.1 > > Any suggestions would be welcome as for now I am assuming a > programming error. > I can confirm that within our code, we never call `timestamp()` on the > context. > > Thanks > Adrian > -- -- Guozhang