vvcephei commented on a change in pull request #11557: URL: https://github.com/apache/kafka/pull/11557#discussion_r762146251
########## File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ########## @@ -1716,4 +1725,110 @@ protected void processStreamThread(final Consumer<StreamThread> consumer) { return Collections.unmodifiableMap(localStorePartitionLags); } + + /** + * Run an interactive query against a state store. + * <p> + * This method allows callers outside of the Streams runtime to access the internal state of + * stateful processors. See https://kafka.apache.org/documentation/streams/developer-guide/interactive-queries.html + * for more information. + * + * @param <R> The result type specified by the query. + * @throws StreamsNotStartedException If Streams has not yet been started. Just call {@link + * KafkaStreams#start()} and then retry this call. + * @throws StreamsStoppedException If Streams is in a terminal state like PENDING_SHUTDOWN, + * NOT_RUNNING, PENDING_ERROR, or ERROR. The caller should + * discover a new instance to query. + * @throws UnknownStateStoreException If the specified store name does not exist in the + * topology. + */ + @Evolving + public <R> StateQueryResult<R> query(final StateQueryRequest<R> request) { + final String storeName = request.getStoreName(); + if (!topologyMetadata.hasStore(storeName)) { + throw new UnknownStateStoreException( + "Cannot get state store " + + storeName + + " because no such store is registered in the topology." + ); + } + if (state().hasNotStarted()) { + throw new StreamsNotStartedException( + "KafkaStreams has not been started, you can retry after calling start()." + ); + } + if (state().isShuttingDown() || state.hasCompletedShutdown()) { + throw new StreamsStoppedException( + "KafkaStreams has been stopped (" + state + ")." + + " This instance can no longer serve queries." + ); + } + final StateQueryResult<R> result = new StateQueryResult<>(); + + final Map<String, StateStore> globalStateStores = topologyMetadata.globalStateStores(); + if (globalStateStores.containsKey(storeName)) { + final StateStore store = globalStateStores.get(storeName); + final QueryResult<R> r = + store.query( + request.getQuery(), + request.getPositionBound(), + request.executionInfoEnabled() + ); + result.setGlobalResult(r); + } else { + final Set<Integer> handledPartitions = new HashSet<>(); + + for (final StreamThread thread : threads) { + final Map<TaskId, Task> tasks = thread.allTasks(); + for (final Entry<TaskId, Task> entry : tasks.entrySet()) { + + final TaskId taskId = entry.getKey(); + final int partition = taskId.partition(); + if (request.isAllPartitions() + || request.getPartitions().contains(partition)) { + final Task task = entry.getValue(); + final StateStore store = task.getStore(storeName); + if (store != null) { + final StreamThread.State state = thread.state(); + final boolean active = task.isActive(); + if (request.isRequireActive() + && (state != StreamThread.State.RUNNING + || !active)) { + result.addResult( + partition, + QueryResult.forFailure( + FailureReason.NOT_ACTIVE, + "Query requires a running active task," + + " but partition was in state " + + state + " and was " + + (active ? "active" : "not active") + "." + ) + ); + } else { + final QueryResult<R> r = store.query( + request.getQuery(), + request.isRequireActive() + ? PositionBound.unbounded() + : request.getPositionBound(), + request.executionInfoEnabled() + ); + result.addResult(partition, r); + } + } + + // optimization: if we have handled all the requested partitions, + // we can return right away. + handledPartitions.add(partition); + if (!request.isAllPartitions() + && handledPartitions.containsAll(request.getPartitions())) { + return result; + } + } + } + } + } + + return result; + } Review comment: For sure! I was actually hoping that we would have a chance to make this logic more efficient, so I didn't bother making it too pretty, but you're totally right. There's no guarantee on when or if we'll refactor it, so we should make it nice now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org