Hi Mikael,

Just adding to Damian's comment above, that the IllegalStateStoreException
here is thrown to indicate a "transient" state where the state store
hosting this key is being migrated and hence not available, where users
implementing the REST APIs on top of it, for example, can choose to handle
it differently. For example, either return a sentinel value as "key not
available" or return some error codes.

Guozhang


On Fri, Sep 9, 2016 at 9:40 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Mikael,
>
> During rebalance both instances should throw IllegalStateStoreException
> until the rebalance has completed. Once the rebalance has completed if the
> key is not found on the local store, then you would get a null value. You
> can always find the Kafka Streams instance that will have that key
> (assuming it exists) by using:
>
> StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
> Serializer<K> keySerializer)
>
> The StreamsMetadata will tell you which instance, via HostInfo, has the
> given key.
>
> HTH,
> Damian
>
>
>
>
> On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <hoegqv...@gmail.com> wrote:
>
> > Hi Damian,
> >
> > thanks for fixing this so quickly, I re-ran the test and it works fine.
> >
> > The next test I tried was to read from two service instances implementing
> > the same string count topology. First, the client is started sending two
> > read requests, one per instance, every second. Next, I start the first
> > instance and let it complete the store init before the next instance is
> > started.
> >
> > Below is the initial part of the trace when going from 0 to 1 instance.
> The
> > trace log has the following columns: request id, instance, response code
> > and value.
> >
> > 3,localhost:2030,503,
> > 3,localhost:2031,503,
> > 4,localhost:2030,503,
> > 4,localhost:2031,503,
> > 5,localhost:2030,200,2
> > 5,localhost:2031,503,
> > 6,localhost:2030,200,2
> > 6,localhost:2031,503,
> >
> > Before the instance is started, both return 503, the status returned by
> the
> > client when it cannot connect to an instance. When the first instance has
> > started it returns the expected value 2 for request pair 5, 6 and so on.
> > The trace below is from when the second instance starts.
> >
> > 18,localhost:2030,200,2
> > 18,localhost:2031,503,
> > 19,localhost:2030,404,
> > 19,localhost:2031,503,
> > 20,localhost:2030,404,
> > 20,localhost:2031,503,
> > 21,localhost:2030,404,
> > 21,localhost:2031,200,2
> > 22,localhost:2030,404,
> > 22,localhost:2031,200,2
> >
> > The new instance takes over responsibility for the partition containing
> the
> > key "hello". During this period the new instance returns 503 as expected
> > until the store is ready. The issue is that the first instance that
> stored
> > the value starts returning 404 from request pair 19. A client doing
> > requests for this key would then have the following sequence:
> >
> > 18 -> 2
> > 19 -> Not found
> > 20 -> Not found
> > 21 -> 2
> >
> > From the client perspective, I think this violates the guarantee of
> always
> > reading the latest value.
> >
> > Am I making the wrong assumptions or is there some way to detect that the
> > local store is not responsible for the key anymore?
> >
> > Best,
> > Mikael
> >
> > On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <damian....@gmail.com> wrote:
> >
> > > Hi Mikael,
> > >
> > > A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123
> >
> > > (the
> > > issue you found with receiving null values) has now been committed to
> > > trunk. I've tried it with your github repo and it appears to be
> working.
> > > You will have to make a small change to your code as we now throw
> > > InvalidStateStoreException when the Stores are unavailable (previously
> we
> > > returned null).
> > >
> > > We added a test here
> > > <
> > >
> > https://github.com/apache/kafka/blob/trunk/streams/src/
> test/java/org/apache/kafka/streams/integration/
> QueryableStateIntegrationTest.java#L431
> > > >
> > > to
> > > make sure we only get a value once the store has been (re-)initialized.
> > > Please give it a go and thanks for your help in finding this issue.
> > >
> > > Thanks,
> > > Damian
> > >
> > > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <hoegqv...@gmail.com>
> wrote:
> > >
> > > > Hi Damian,
> > > >
> > > > > > Failed to read key hello, org.mkhq.kafka.Topology$
> StoreUnavailable
> > > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > > > hello -> 10
> > > > > >
> > > > > >
> > > > > The case where you get KeyNotFound looks like a bug to me. This
> > > shouldn't
> > > > > happen. I can see why it might happen and we will create a JIRA and
> > fix
> > > > it
> > > > > right away.
> > > > >
> > > >
> > > > Great, thanks for looking into this. I'll try again once the PR is
> > > merged.
> > > >
> > > >
> > > > >
> > > > > I'm not sure how you end up with (hello -> 10). It could indicate
> > that
> > > > the
> > > > > offsets for the topic you are consuming from weren't committed so
> the
> > > > data
> > > > > gets processed again on the restart.
> > > > >
> > > >
> > > > Yes, it didn't commit the offsets since streams.close() was not
> called
> > on
> > > > ctrl-c. Fixed by adding a shutdown hook.
> > > >
> > > > Thanks,
> > > > Mikael
> > > >
> > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to