vpapavas commented on a change in pull request #11406:
URL: https://github.com/apache/kafka/pull/11406#discussion_r734548879



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -1715,4 +1723,93 @@ protected void processStreamThread(final 
Consumer<StreamThread> consumer) {
 
         return Collections.unmodifiableMap(localStorePartitionLags);
     }
+
+    public <K, V> StateSerdes<K, V> serdesForStore(final String storeName) {
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+
+        // TODO this is a hack. We ought to be able to create the serdes 
independent of the
+        // TODO stores and cache them in the topology.
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();
+        if (globalStateStores.containsKey(storeName)) {
+            final StateStore store = globalStateStores.get(storeName);
+            return getSerdes(store);
+        } else {
+            for (final StreamThread thread : threads) {
+                final Map<TaskId, Task> tasks = thread.allTasks();
+                for (final Entry<TaskId, Task> entry : tasks.entrySet()) {
+                    final StateStore store = 
entry.getValue().getStore(storeName);
+                    if (store != null) {
+                        return getSerdes(store);
+                    }
+                }
+            }
+        }
+        // there may be no local copy of this store.
+        // This is the main reason I want to decouble serde
+        // creation from the store itself.
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private <V, K> StateSerdes<K, V> getSerdes(final StateStore store) {
+        if (store instanceof MeteredKeyValueStore) {
+            return ((MeteredKeyValueStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredSessionStore) {
+            return ((MeteredSessionStore<K, V>) store).serdes();
+        } else if (store instanceof MeteredWindowStore) {
+            return ((MeteredWindowStore<K, V>) store).serdes();
+        } else {
+            throw new IllegalArgumentException("Unknown store type: " + store);
+        }
+    }
+
+    public <R> InteractiveQueryResult<R> query(final 
InteractiveQueryRequest<R> request) {
+        final String storeName = request.getStoreName();
+        if (!topologyMetadata.hasStore(storeName)) {
+            throw new UnknownStateStoreException(
+                "Cannot get state store " + storeName + " because no such 
store is registered in the topology."
+            );
+        }
+        final InteractiveQueryResult<R> result = new 
InteractiveQueryResult<>();
+
+        final Map<String, StateStore> globalStateStores = 
topologyMetadata.globalStateStores();

Review comment:
       What is the difference between the global state stores and the state 
stores associated with a task?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to