Thanks Guozhang

I do not think its ever updated.
I waited for a while.
I did implement workaround.
Can you also look at my embedded Kafka question

Boris Lublinsky
FDP Architect
boris.lublin...@lightbend.com
https://www.lightbend.com/

> On Nov 16, 2017, at 10:29 PM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Hello Boris,
> 
> The reason of this check is to make sure that the cluster metadata has been
> updated at least once, meaning that the instance has gone through the
> initialization phase of the rebalance and have received the assignment
> information already. Before this phase, any metadata returned may be
> incorrect.
> 
> To get around this scenario, you may need to wait on the state of the
> running instance to go from Rebalancing to Running, which means it has been
> stabilized with the global metadata. This can be done by passing in your
> customized state change listener:
> 
> public void setStateListener(final KafkaStreams.StateListener listener)
> 
> 
> Guozhang
> 
> 
> On Mon, Nov 13, 2017 at 9:43 AM, Boris Lublinsky <
> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>> wrote:
> 
>> I have updated my queryable state example, based on
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- 
>> <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries <
>> https://github.com/confluentinc/examples/tree/3.2.x/kafka- 
>> <https://github.com/confluentinc/examples/tree/3.2.x/kafka->
>> streams/src/main/java/io/confluent/examples/streams/interactivequeries>
>> 
>> To 1.0.0
>> And now when I am trying trying to get instances for store when running on
>> a local machine, I am getting an empty array, from
>> 
>> public List<HostStoreInfo> streamsMetadataForStore(final  String store) {
>>    // Get metadata for all of the instances of this Kafka Streams
>> application hosting the store
>>    final Collection<StreamsMetadata> metadata =
>> streams.allMetadataForStore(store);
>>    return mapInstancesToHostStoreInfo(metadata);
>> }
>> This is happening because in StreamsMetadata state,
>> 
>> public synchronized Collection<StreamsMetadata>
>> getAllMetadataForStore(final String storeName) {
>>    Objects.requireNonNull(storeName, "storeName cannot be null");
>> 
>>    if (!isInitialized()) {
>>        return Collections.emptyList();
>>    }
>> IsInitialized method
>> 
>> private boolean isInitialized() {
>>    return clusterMetadata != null && !clusterMetadata.topics().isEmpty();
>> }
>> Check for the cluster, which is null
>> 
>> Boris Lublinsky
>> FDP Architect
>> boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com> 
>> <mailto:boris.lublin...@lightbend.com <mailto:boris.lublin...@lightbend.com>>
>> https://www.lightbend.com/ <https://www.lightbend.com/>
>> 
>> 
> 
> 
> -- 
> -- Guozhang

Reply via email to