Hi Daan,

First of all, it does sound like that is a correct
implementation of QueryableStoreProvider. Kudos for taking
that on; the complexity of that API was one of my top
motivations for replacing it with IQv2!
(https://cwiki.apache.org/confluence/display/KAFKA/KIP-796%3A+Interactive+Query+v2
)

To answer your question directly, no "activeHost" just means
the host that currently has the "activeTask" for the desired
store.

I suspect that this is either a subtle and rare edge case in
how the metadata gets updated, or it's just a simple race
condition between the query and a rebalance in the cluster,
which is a fact of life in any distributed database.

If you are able to reproduce it and send us the logs, we
should be able to tell which is which.

In particular, we'd need to see thee things in the logs:
1. The logs for the rebalances and assignments (which are on
by default)
2. The log of when you check the metadata and what the
result it
3. The log of when the query tries to run on the
"activeHost" and what it finds there (that the task is only
a standby)

One other possibility worth considering is whether the
queryMetadataForKey is producing the correct partition. What
it does is run the provided key through the provided
serializer and then run the serialized key though the
default partitioner. If your actual data isn't partitioned
the same way, then queryMetadataForKey might be effectively
selecting a random host, which sometimes happens to host the
active task and other times does not? Kind of a long shot,
but I just wanted to put it out there.

Thanks,
-John


On Mon, 2022-03-28 at 13:48 +0000, Daan Gertis wrote:
> Hi All,
> 
> We are experiencing some weird behaviour with our interactive query service 
> implementation.
> This is the flow we’ve implemented:
> 
> 
>   1.  kafkaStreams.queryMetadataForKey(store, key, serializer) returns for 
> activeHost HostInfo{host='localhost', port=8562}, and standbyHosts [] for the 
> store and partition where the key would reside. We are not interested in 
> standby hosts. Luckily, we have an active host which we can call.
>   2.  We make an HTTP call to host localhost:8562, asking for the key there.
>   3.  Inside the 8562 host, we retrieve the store by calling 
> kafkaStreams.store(parameters), using parameters with staleStores set to 
> false.
>   4.  We call kafkaStreams.state().equals(RUNNING) to make sure we’re in the 
> RUNNING state.
>   5.  Now we call store.get(key) in order to retrieve the key from the store, 
> if it has been stored there.
>   6.  The get method on our store implementation calls the 
> storeProvider.stores(storeName, storeType) method to iterate over all the 
> stores available on the host.
>   7.  The storeProvider is a WrappingStoreProvider, which calls 
> storeProvider.stores(storeQueryParameters) for each 
> StreamThreadStateStoreProvider it wraps (just one in our case).
>   8.  As the logic inside that stores method finds that the StreamThread is 
> in the RUNNING state, it retrieves the tasks based on 
> storeQueryParams.staleStoresEnabled() ? streamThread.allTasks().values() : 
> streamThread.activeTasks(), which evaluates to false since we set staleStores 
> to false in the params.
>   9.  To our surprise, the streamThread.activeTasks() method returns an empty 
> ArrayList, while the streamThread.allTasks().values() returns one StandbyTask 
> for the store we’re looking for.
>   10. As there appear to be no active tasks on this host for this store, we 
> return the fabled “The state store, " + storeName + ", may have migrated to 
> another instance.” InvalidStateStoreException.
> 
> This flow is quite tricky as the queryMetadataForKey returns an active host, 
> which turns out to only have a standby task once queried.
> I have executed the queryMetadataForKey method on the active host as well, 
> once before calling kafkaStreams.store in step 3, and another time between 
> step 4 and 5. Each time the metadata returns the same, the host we’re on at 
> that moment is the active host.
> 
> Could it be there is a difference between activeHost and activeTask?
> 
> For those also on the confluent community slack might recognize this message 
> as it has been posted there by our CTO as well.
> 
> Cheers,
> D.

Reply via email to