Hi Mikael,

During rebalance both instances should throw IllegalStateStoreException
until the rebalance has completed. Once the rebalance has completed if the
key is not found on the local store, then you would get a null value. You
can always find the Kafka Streams instance that will have that key
(assuming it exists) by using:

StreamsMetadata KafkaStreams.metadataForKey(String storeName, K key,
Serializer<K> keySerializer)

The StreamsMetadata will tell you which instance, via HostInfo, has the
given key.

HTH,
Damian




On Fri, 9 Sep 2016 at 16:56 Mikael Högqvist <hoegqv...@gmail.com> wrote:

> Hi Damian,
>
> thanks for fixing this so quickly, I re-ran the test and it works fine.
>
> The next test I tried was to read from two service instances implementing
> the same string count topology. First, the client is started sending two
> read requests, one per instance, every second. Next, I start the first
> instance and let it complete the store init before the next instance is
> started.
>
> Below is the initial part of the trace when going from 0 to 1 instance. The
> trace log has the following columns: request id, instance, response code
> and value.
>
> 3,localhost:2030,503,
> 3,localhost:2031,503,
> 4,localhost:2030,503,
> 4,localhost:2031,503,
> 5,localhost:2030,200,2
> 5,localhost:2031,503,
> 6,localhost:2030,200,2
> 6,localhost:2031,503,
>
> Before the instance is started, both return 503, the status returned by the
> client when it cannot connect to an instance. When the first instance has
> started it returns the expected value 2 for request pair 5, 6 and so on.
> The trace below is from when the second instance starts.
>
> 18,localhost:2030,200,2
> 18,localhost:2031,503,
> 19,localhost:2030,404,
> 19,localhost:2031,503,
> 20,localhost:2030,404,
> 20,localhost:2031,503,
> 21,localhost:2030,404,
> 21,localhost:2031,200,2
> 22,localhost:2030,404,
> 22,localhost:2031,200,2
>
> The new instance takes over responsibility for the partition containing the
> key "hello". During this period the new instance returns 503 as expected
> until the store is ready. The issue is that the first instance that stored
> the value starts returning 404 from request pair 19. A client doing
> requests for this key would then have the following sequence:
>
> 18 -> 2
> 19 -> Not found
> 20 -> Not found
> 21 -> 2
>
> From the client perspective, I think this violates the guarantee of always
> reading the latest value.
>
> Am I making the wrong assumptions or is there some way to detect that the
> local store is not responsible for the key anymore?
>
> Best,
> Mikael
>
> On Thu, Sep 8, 2016 at 11:03 AM Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Mikael,
> >
> > A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123>
> > (the
> > issue you found with receiving null values) has now been committed to
> > trunk. I've tried it with your github repo and it appears to be working.
> > You will have to make a small change to your code as we now throw
> > InvalidStateStoreException when the Stores are unavailable (previously we
> > returned null).
> >
> > We added a test here
> > <
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431
> > >
> > to
> > make sure we only get a value once the store has been (re-)initialized.
> > Please give it a go and thanks for your help in finding this issue.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <hoegqv...@gmail.com> wrote:
> >
> > > Hi Damian,
> > >
> > > > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > > > hello -> 10
> > > > >
> > > > >
> > > > The case where you get KeyNotFound looks like a bug to me. This
> > shouldn't
> > > > happen. I can see why it might happen and we will create a JIRA and
> fix
> > > it
> > > > right away.
> > > >
> > >
> > > Great, thanks for looking into this. I'll try again once the PR is
> > merged.
> > >
> > >
> > > >
> > > > I'm not sure how you end up with (hello -> 10). It could indicate
> that
> > > the
> > > > offsets for the topic you are consuming from weren't committed so the
> > > data
> > > > gets processed again on the restart.
> > > >
> > >
> > > Yes, it didn't commit the offsets since streams.close() was not called
> on
> > > ctrl-c. Fixed by adding a shutdown hook.
> > >
> > > Thanks,
> > > Mikael
> > >
> > >
> > > > Thanks,
> > > > Damian
> > > >
> > >
> >
>

Reply via email to