Guozhang, Matthias Thanks for confirming, it was quite clear with current understanding that there was a mistake in the supplier implementation when seeing it.
The Javadoc is indeed clear what to do, this was ultimately a failing to read that over the documentation provided on confluent / kafka.apache.org If I have any suggestion for improvement it would be a one liner in the non-javadoc documentation explaining the purpose of a new instance of ProcessorSupplier / TransformerSupplier (there is no reference there at all as it stands) - but it is clear on the Javadoc. Thanks for the assistance Adrian -----Original Message----- From: Guozhang Wang [mailto:wangg...@gmail.com] Sent: 16 June 2017 20:35 To: users@kafka.apache.org Subject: Re: IllegalStateException when putting to state store in Transformer implementation Adrian, I see. That would explain what you see, i.e. all tasks with their own "processor context" are accessing the same state store instance; hence for some task its processor context may not be updated yet while another task is accessing that state store, hence causing the issue. If you are with Java8 already, it is recommended to code with .transform(() -> new PhaseTransformer<>(..)) Guozhang On Fri, Jun 16, 2017 at 2:29 AM, Adrian McCague <adrian.mcca...@zopa.com> wrote: > Hi Guozhang > > It's just occurred to me that the transformer is added to the topology > like this: > > PhaseTransformer<S, E> transformer = new PhaseTransformer<>(evaluator, > storeName); ... > .transform(() -> transformer, transformer.getStoreName()) > > Thus meaning that the same transformer is used whenever the supplier > is invoked, I wonder if this could cause some odd interaction in this case. > Now that I think about it perhaps one instance of transformer is used > per partition handled? > > Thanks > Adrian > > -----Original Message----- > From: Guozhang Wang [mailto:wangg...@gmail.com] > Sent: 16 June 2017 02:54 > To: users@kafka.apache.org > Subject: Re: IllegalStateException when putting to state store in > Transformer implementation > > Adrian, > > I looked though the 0.10.2.1 code but I cannot nail down to any > obvious places where the processor context is set to null, which could > trigger your exception. Also from your stack trace there is no direct clues > available. > > Would you mind creating a JIRA and attach the link to your JSON and your " > PhaseTransformer" implementation sketch, since what you observed may > be a real issue? > > > Guozhang > > > On Thu, Jun 15, 2017 at 1:42 AM, Adrian McCague > <adrian.mcca...@zopa.com> > wrote: > > > Hi Guozhang, thanks for your reply > > > > I can confirm that the init method is quite basic: > > > > public void init(ProcessorContext context) { > > context.schedule(TIMEOUT.getMillis()); > > this.context = context; > > > > this.store = > > (KeyValueStore)this.context.getStateStore(storeName); > > } > > > > Omitting try catch and logging. > > > > In case it's relevant, this is how the state store is created: > > > > StateStoreSupplier storeSupplier = Stores.create(storeName) > > .withKeys(keyserde) > > .withValues(valueserde) > > .persistent() > > .build(); > > builder.addStateStore(storeSupplier); > > > > I can confirm though that all stack traces we have clearly originate > > from `put()` being called from `transform()` > > > > Thanks > > Adrian > > > > -----Original Message----- > > From: Guozhang Wang [mailto:wangg...@gmail.com] > > Sent: 14 June 2017 21:18 > > To: users@kafka.apache.org > > Subject: Re: IllegalStateException when putting to state store in > > Transformer implementation > > > > Hello Adrian, > > > > When you call "put()" on the windowed state store that does not > > specify a timestamp, then the `timestamp()` is retrieved to use as > > the default timestamp. > > > > ---------------------- > > > > public synchronized void put(final K key, final V value) { > > put(key, value, context.timestamp()); } > > > > ---------------------- > > > > The question is when were you calling `put()` in the Transformer, > > did you ever call it in `init()` function? > > > > > > Guozhang > > > > > > On Wed, Jun 14, 2017 at 11:08 AM, Adrian McCague > > <adrian.mcca...@zopa.com> > > wrote: > > > > > Hi All > > > > > > We have a transformer implementation in our Kafka Streams > > > application that raises this exception, sometimes, when starting. > > > > > > "java.lang.IllegalStateException: This should not happen as > > > timestamp() should only be called while a record is processed" > > > > > > This happens when 'put' is called on a state store within the > > > `transform` method of a custom `Transformer`. > > > > > > The full trace can be seen here, apologies for the JSON formatting: > > > https://pastebin.com/QYKE7bSH > > > > > > > > > * We did not see this when the input and output topic of the > topology > > > had only a single partition. > > > * We do not see this when the streams thread is handling only a > > single > > > partition of data. (ie 4 partitions, 4 consumers in the consumer group) > > > * We see this when deploying the consumer group and the first > > > consumers to connect are handling multiple partitions (assumed). > > > Once all have started and each consumer is processing a single > > > partition each, the issue appears to go away. > > > > > > We are using Kafka Streams Client: 0.10.2.1 > > > > > > Any suggestions would be welcome as for now I am assuming a > > > programming error. > > > I can confirm that within our code, we never call `timestamp()` on > > > the context. > > > > > > Thanks > > > Adrian > > > > > > > > > > > -- > > -- Guozhang > > > > > > -- > -- Guozhang > -- -- Guozhang