Hi, this helps, thanks!
Basically, after each read, I'll check if the key is still supposed to be on the host. Doing the check after the read is necessary to handle the case when a rebalance happens in between the metadata lookup and the store get. When checking after the read, it may happen that a valid read becomes invalid, but that doesn't affect correctness. During a rebalance the service either responds not available or redirect. After the rebalance is completed, the store responds with redirect. With a REST API, this could mean either 404 or a 303, temporary redirect to the current host. Best, Mikael On Mon, Sep 12, 2016 at 5:42 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Mikael, > > Just adding to Damian's comment above, that the IllegalStateStoreException > here is thrown to indicate a "transient" state where the state store > hosting this key is being migrated and hence not available, where users > implementing the REST APIs on top of it, for example, can choose to handle > it differently. For example, either return a sentinel value as "key not > available" or return some error codes. > > Guozhang > > > On Fri, Sep 9, 2016 at 9:40 AM, Damian Guy <damian....@gmail.com> wrote: > > > Hi Mikael, > > > > During rebalance both instances should throw IllegalStateStoreException > > until the rebalance has completed. Once the rebalance has completed if > the > > key is not found on the local store, then you would get a null value. You > > can always find the Kafka Streams instance that will have that key > > (assuming it exists) by using: > > > > StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key, > > Serializer<K> keySerializer) > > > > The StreamsMetadata will tell you which instance, via HostInfo, has the > > given key. > > > > HTH, > > Damian > > > > > > > > > > On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <hoegqv...@gmail.com> wrote: > > > > > Hi Damian, > > > > > > thanks for fixing this so quickly, I re-ran the test and it works fine. > > > > > > The next test I tried was to read from two service instances > implementing > > > the same string count topology. First, the client is started sending > two > > > read requests, one per instance, every second. Next, I start the first > > > instance and let it complete the store init before the next instance is > > > started. > > > > > > Below is the initial part of the trace when going from 0 to 1 instance. > > The > > > trace log has the following columns: request id, instance, response > code > > > and value. > > > > > > 3,localhost:2030,503, > > > 3,localhost:2031,503, > > > 4,localhost:2030,503, > > > 4,localhost:2031,503, > > > 5,localhost:2030,200,2 > > > 5,localhost:2031,503, > > > 6,localhost:2030,200,2 > > > 6,localhost:2031,503, > > > > > > Before the instance is started, both return 503, the status returned by > > the > > > client when it cannot connect to an instance. When the first instance > has > > > started it returns the expected value 2 for request pair 5, 6 and so > on. > > > The trace below is from when the second instance starts. > > > > > > 18,localhost:2030,200,2 > > > 18,localhost:2031,503, > > > 19,localhost:2030,404, > > > 19,localhost:2031,503, > > > 20,localhost:2030,404, > > > 20,localhost:2031,503, > > > 21,localhost:2030,404, > > > 21,localhost:2031,200,2 > > > 22,localhost:2030,404, > > > 22,localhost:2031,200,2 > > > > > > The new instance takes over responsibility for the partition containing > > the > > > key "hello". During this period the new instance returns 503 as > expected > > > until the store is ready. The issue is that the first instance that > > stored > > > the value starts returning 404 from request pair 19. A client doing > > > requests for this key would then have the following sequence: > > > > > > 18 -> 2 > > > 19 -> Not found > > > 20 -> Not found > > > 21 -> 2 > > > > > > From the client perspective, I think this violates the guarantee of > > always > > > reading the latest value. > > > > > > Am I making the wrong assumptions or is there some way to detect that > the > > > local store is not responsible for the key anymore? > > > > > > Best, > > > Mikael > > > > > > On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <damian....@gmail.com> > wrote: > > > > > > > Hi Mikael, > > > > > > > > A fix for KAFKA-4123 < > https://issues.apache.org/jira/browse/KAFKA-4123 > > > > > > > (the > > > > issue you found with receiving null values) has now been committed to > > > > trunk. I've tried it with your github repo and it appears to be > > working. > > > > You will have to make a small change to your code as we now throw > > > > InvalidStateStoreException when the Stores are unavailable > (previously > > we > > > > returned null). > > > > > > > > We added a test here > > > > < > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > test/java/org/apache/kafka/streams/integration/ > > QueryableStateIntegrationTest.java#L431 > > > > > > > > > to > > > > make sure we only get a value once the store has been > (re-)initialized. > > > > Please give it a go and thanks for your help in finding this issue. > > > > > > > > Thanks, > > > > Damian > > > > > > > > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <hoegqv...@gmail.com> > > wrote: > > > > > > > > > Hi Damian, > > > > > > > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$ > > StoreUnavailable > > > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound > > > > > > > > hello -> 10 > > > > > > > > > > > > > > > > > > > > The case where you get KeyNotFound looks like a bug to me. This > > > > shouldn't > > > > > > happen. I can see why it might happen and we will create a JIRA > and > > > fix > > > > > it > > > > > > right away. > > > > > > > > > > > > > > > > Great, thanks for looking into this. I'll try again once the PR is > > > > merged. > > > > > > > > > > > > > > > > > > > > > > I'm not sure how you end up with (hello -> 10). It could indicate > > > that > > > > > the > > > > > > offsets for the topic you are consuming from weren't committed so > > the > > > > > data > > > > > > gets processed again on the restart. > > > > > > > > > > > > > > > > Yes, it didn't commit the offsets since streams.close() was not > > called > > > on > > > > > ctrl-c. Fixed by adding a shutdown hook. > > > > > > > > > > Thanks, > > > > > Mikael > > > > > > > > > > > > > > > > Thanks, > > > > > > Damian > > > > > > > > > > > > > > > > > > > > > > > > -- > -- Guozhang >