I have a few logs below.

Thank you.
Rainer

14:21:04,304 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
Creating restore consumer client
14:21:04,393 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
Creating shared producer client
14:21:04,429 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
Creating consumer client
14:21:04,620 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
Starting
14:21:04,622 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
State transition from CREATED to RUNNING
14:21:04,639 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
State transition from RUNNING to PARTITIONS_REVOKED
14:21:04,639 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
partition revocation took 0 ms.
 suspended active tasks: []
 suspended standby tasks: []
14:21:04,748 INFO  [RestApplication] Adding listener: http://0.0.0.0:8082
14:21:04,768 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
State transition from PARTITIONS_REVOKED to PARTITIONS_ASSIGNED
14:21:04,783 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
partition assignment took 15 ms.
 current active tasks: [0_0]
 current standby tasks: []
 previous active tasks: []

STATESTORE INIT
14:21:04,873 INFO  ... our stuff here
PROCESSOR INIT
14:21:05,637 INFO  ... our stuff here
RESTORE CALLED
...
RESTORE CALLED
14:21:05,680 INFO  [StreamThread] stream-thread 
[t10_nr3_metadatarecovery-17935bc8-6ec2-4fcc-b62f-15a63c9a051c-StreamThread-1] 
State transition from PARTITIONS_ASSIGNED to RUNNING

==================
Sent: Thursday, November 09, 2017 at 1:43 PM
From: "Bill Bejeck" <b...@confluent.io>
To: users@kafka.apache.org
Subject: Re: Kafka Streams - Custom processor "init" method called before state 
store has data restored into it
Hi Rainer,

Thanks for reporting this issue. Do you have any log data you can share?

In the meantime, I'll look into the issue.

Thanks,
Bill

On Thu, Nov 9, 2017 at 1:23 PM, Rainer Guessner <raguess...@gmx.com> wrote:

> I have a custom processor that implements AbstractProcessor and a custom
> store that implements StateStore.
>
> Before Kafka 1.0.0 the processors "init" method gets called after the
> state store is restored from changelog and that is good.
> With Kafka 1.0.0 the processors "init" method is called BEFORE the state
> store is restored from changelog and that is bad.
>
> My processor can only initialize when it has access to the state. However
> at the time KStreams calls "init" on the processor the state store may not
> have any data. It is not an option for me to initialize the processor
> lazily when a record arrives, or to re-initialize it when "onRestoreEnd" is
> called (its only called on restore; The state store "init" gets called
> before processor "init" regardless of restore or not.)
>
> I think I need to have either of these:
> a) know whether or not a state restore will take place and when not
> b) or get a call to the state store regardless of whether state restore
> took place or not
> c) or I need a "ready" method on the processor that gets called when the
> state store has completed restoring and is actually usable
>
> Please help, thank you in advance.
> Rainer
>

Reply via email to