Hi Daan, First of all, it does sound like that is a correct implementation of QueryableStoreProvider. Kudos for taking that on; the complexity of that API was one of my top motivations for replacing it with IQv2! (https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2 )
To answer your question directly, no "activeHost" just means the host that currently has the "activeTask" for the desired store. I suspect that this is either a subtle and rare edge case in how the metadata gets updated, or it's just a simple race condition between the query and a rebalance in the cluster, which is a fact of life in any distributed database. If you are able to reproduce it and send us the logs, we should be able to tell which is which. In particular, we'd need to see thee things in the logs: 1. The logs for the rebalances and assignments (which are on by default) 2. The log of when you check the metadata and what the result it 3. The log of when the query tries to run on the "activeHost" and what it finds there (that the task is only a standby) One other possibility worth considering is whether the queryMetadataForKey is producing the correct partition. What it does is run the provided key through the provided serializer and then run the serialized key though the default partitioner. If your actual data isn't partitioned the same way, then queryMetadataForKey might be effectively selecting a random host, which sometimes happens to host the active task and other times does not? Kind of a long shot, but I just wanted to put it out there. Thanks, -John On Mon, 2022-03-28 at 13:48 +0000, Daan Gertis wrote: > Hi All, > > We are experiencing some weird behaviour with our interactive query service > implementation. > This is the flow we’ve implemented: > > > 1. kafkaStreams.queryMetadataForKey(store, key, serializer) returns for > activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the > store and partition where the key would reside. We are not interested in > standby hosts. Luckily, we have an active host which we can call. > 2. We make an HTTP call to host localhost:8562, asking for the key there. > 3. Inside the 8562 host, we retrieve the store by calling > kafkaStreams.store(parameters), using parameters with staleStores set to > false. > 4. We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the > RUNNING state. > 5. Now we call store.get(key) in order to retrieve the key from the store, > if it has been stored there. > 6. The get method on our store implementation calls the > storeProvider.stores(storeName, storeType) method to iterate over all the > stores available on the host. > 7. The storeProvider is a WrappingStoreProvider, which calls > storeProvider.stores(storeQueryParameters) for each > StreamThreadStateStoreProvider it wraps (just one in our case). > 8. As the logic inside that stores method finds that the StreamThread is > in the RUNNING state, it retrieves the tasks based on > storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : > streamThread.activeTasks(), which evaluates to false since we set staleStores > to false in the params. > 9. To our surprise, the streamThread.activeTasks() method returns an empty > ArrayList, while the streamThread.allTasks().values() returns one StandbyTask > for the store we’re looking for. > 10. As there appear to be no active tasks on this host for this store, we > return the fabled “The state store, " + storeName + ", may have migrated to > another instance.” InvalidStateStoreException. > > This flow is quite tricky as the queryMetadataForKey returns an active host, > which turns out to only have a standby task once queried. > I have executed the queryMetadataForKey method on the active host as well, > once before calling kafkaStreams.store in step 3, and another time between > step 4 and 5. Each time the metadata returns the same, the host we’re on at > that moment is the active host. > > Could it be there is a difference between activeHost and activeTask? > > For those also on the confluent community slack might recognize this message > as it has been posted there by our CTO as well. > > Cheers, > D.