Hi Mikael,

A fix for KAFKA-4123 <https://issues.apache.org/jira/browse/KAFKA-4123> (the
issue you found with receiving null values) has now been committed to
trunk. I've tried it with your github repo and it appears to be working.
You will have to make a small change to your code as we now throw
InvalidStateStoreException when the Stores are unavailable (previously we
returned null).

We added a test here
<https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java#L431>
to
make sure we only get a value once the store has been (re-)initialized.
Please give it a go and thanks for your help in finding this issue.

Thanks,
Damian

On Mon, 5 Sep 2016 at 22:07 Mikael Högqvist <hoegqv...@gmail.com> wrote:

> Hi Damian,
>
> > > Failed to read key hello, org.mkhq.kafka.Topology$StoreUnavailable
> > > > Failed to read key hello, org.mkhq.kafka.Topology$KeyNotFound
> > > > hello -> 10
> > >
> > >
> > The case where you get KeyNotFound looks like a bug to me. This shouldn't
> > happen. I can see why it might happen and we will create a JIRA and fix
> it
> > right away.
> >
>
> Great, thanks for looking into this. I'll try again once the PR is merged.
>
>
> >
> > I'm not sure how you end up with (hello -> 10). It could indicate that
> the
> > offsets for the topic you are consuming from weren't committed so the
> data
> > gets processed again on the restart.
> >
>
> Yes, it didn't commit the offsets since streams.close() was not called on
> ctrl-c. Fixed by adding a shutdown hook.
>
> Thanks,
> Mikael
>
>
> > Thanks,
> > Damian
> >
>

Reply via email to