Hi Guozhang, thanks for your reply

I can confirm that the init method is quite basic:

public void init(ProcessorContext context) {
    context.schedule(TIMEOUT.getMillis());
    this.context = context;

    this.store = (KeyValueStore)this.context.getStateStore(storeName);
}

Omitting try catch and logging.

In case it's relevant, this is how the state store is created:

StateStoreSupplier storeSupplier = Stores.create(storeName)
                .withKeys(keyserde)
                .withValues(valueserde)
                .persistent()
                .build();
builder.addStateStore(storeSupplier);

I can confirm though that all stack traces we have clearly originate from 
`put()` being called from `transform()`

Thanks
Adrian

-----Original Message-----
From: Guozhang Wang [mailto:wangg...@gmail.com] 
Sent: 14 June 2017 21:18
To: users@kafka.apache.org
Subject: Re: IllegalStateException when putting to state store in Transformer 
implementation

Hello Adrian,

When you call "put()" on the windowed state store that does not specify a 
timestamp, then the `timestamp()` is retrieved to use as the default timestamp.

----------------------

public synchronized void put(final K key, final V value) {
    put(key, value, context.timestamp()); }

----------------------

The question is when were you calling `put()` in the Transformer, did you ever 
call it in `init()` function?


Guozhang


On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague <adrian.mcca...@zopa.com>
wrote:

> Hi All
>
> We have a transformer implementation in our Kafka Streams application 
> that raises this exception, sometimes, when starting.
>
> "java.lang.IllegalStateException: This should not happen as 
> timestamp() should only be called while a record is processed"
>
> This happens when 'put' is called on a state store within the 
> `transform` method of a custom `Transformer`.
>
> The full trace can be seen here, apologies for the JSON formatting:
> https://pastebin.com/QYKE7bSH
>
>
>   *   We did not see this when the input and output topic of the topology
> had only a single partition.
>   *   We do not see this when the streams thread is handling only a single
> partition of data. (ie 4 partitions, 4 consumers in the consumer group)
>   *   We see this when deploying the consumer group and the first
> consumers to connect are handling multiple partitions (assumed).
> Once all have started and each consumer is processing a single 
> partition each, the issue appears to go away.
>
> We are using Kafka Streams Client: 0.10.2.1
>
> Any suggestions would be welcome as for now I am assuming a 
> programming error.
> I can confirm that within our code, we never call `timestamp()` on the 
> context.
>
> Thanks
> Adrian
>



--
-- Guozhang

Reply via email to