Thanks Guozhang I do not think its ever updated. I waited for a while. I did implement workaround. Can you also look at my embedded Kafka question
Boris Lublinsky FDP Architect boris.lublin...@lightbend.com https://www.lightbend.com/ > On Nov 16, 2017, at 10:29 PM, Guozhang Wang <wangg...@gmail.com> wrote: > > Hello Boris, > > The reason of this check is to make sure that the cluster metadata has been > updated at least once, meaning that the instance has gone through the > initialization phase of the rebalance and have received the assignment > information already. Before this phase, any metadata returned may be > incorrect. > > To get around this scenario, you may need to wait on the state of the > running instance to go from Rebalancing to Running, which means it has been > stabilized with the global metadata. This can be done by passing in your > customized state change listener: > > public void setStateListener(final KafkaStreams.StateListener listener) > > > Guozhang > > > On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky < > boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote: > >> I have updated my queryable state example, based on >> https://github.com/confluentinc/examples/tree/3.2.x/kafka- >> <https://github.com/confluentinc/examples/tree/3.2.x/kafka-> >> streams/src/main/java/io/confluent/examples/streams/interactivequeries < >> https://github.com/confluentinc/examples/tree/3.2.x/kafka- >> <https://github.com/confluentinc/examples/tree/3.2.x/kafka-> >> streams/src/main/java/io/confluent/examples/streams/interactivequeries> >> >> To 1.0.0 >> And now when I am trying trying to get instances for store when running on >> a local machine, I am getting an empty array, from >> >> public List<HostStoreInfo> streamsMetadataForStore(final String store) { >> // Get metadata for all of the instances of this Kafka Streams >> application hosting the store >> final Collection<StreamsMetadata> metadata = >> streams.allMetadataForStore(store); >> return mapInstancesToHostStoreInfo(metadata); >> } >> This is happening because in StreamsMetadata state, >> >> public synchronized Collection<StreamsMetadata> >> getAllMetadataForStore(final String storeName) { >> Objects.requireNonNull(storeName, "storeName cannot be null"); >> >> if (!isInitialized()) { >> return Collections.emptyList(); >> } >> IsInitialized method >> >> private boolean isInitialized() { >> return clusterMetadata != null && !clusterMetadata.topics().isEmpty(); >> } >> Check for the cluster, which is null >> >> Boris Lublinsky >> FDP Architect >> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> >> <mailto:boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> >> https://www.lightbend.com/ <https://www.lightbend.com/> >> >> > > > -- > -- Guozhang