Hello Florian, Thanks for the KIP and your detailed explanation of your use case. I think there are two dimensions to discuss on how to improve Streams' debuggability (or more specifically state exposure for visualization).
First question is "what information should we expose to the user". From your KIP I saw generally three categories: 1. The state of the thread within a process, as you mentioned currently we only expose the state of the process but not the finer grained per-thread state. 2. The state of the task. Currently the most close API to this is StreamsMetadata, however it aggregates the tasks across all threads and only present the aggregated set of the assigned partitions / state stores etc. We can consider extending this method to have a new StreamsMetadata#tasks() which returns a TaskMetadata with the similar fields, and the StreamsMetadata.stateStoreNames / etc would still be returning the aggregated results but users can still "drill down" if they want. The second question is "how should we expose them to the user". For example, you mentioned about consumedOffsetsByPartition in the activeTasks. We could add this as a JMX metric based on fetch positions inside the consumer layer (note that Streams is just embedding consumers) or we could consider adding it into TaskMetadata. Either case it can be visualized for monitoring. The reason we expose StreamsMetadata as well as State was that it is expected to be "polled" in a programmatic way for interactive queries and also for control flows (e.g. I would like to ONLY start running my other topology until the first topology has been up and running) while for your case it seems the main purpose is to continuously query them for monitoring etc. Personally I'd prefer to expose them as JMX only for such purposes only to have a simpler API. So given your current motivations I'd suggest expose the following information as newly added metrics in Streams: 1. Thread-level state metric. 2. Task-level hosted client identifier metric (e.g. host:port). 3. Consumer-level per-topic/partition position metric ( https://kafka.apache.org/documentation/#topic_fetch_monitoring). Note that the task-level assignment information is static, i.e. it will not change during the runtime at all and can be accessed from the `toString()` function already even before the instance start running, so I think this piece of information do not need to be exposed through JMX anymore. WDYT? Guozhang On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com> wrote: > Hi Florian, > > Thanks for the KIP. > > It seems there is some overlap here with what we already have in > KafkaStreams.allMetadata(). This currently returns a > Collection<StreamsMetadata> where each StreamsMetadata instance holds the > state stores and partition assignment for every instance of the > KafkaStreams application. I'm wondering if that is good enough for what you > are trying to achieve? If not could it be modified to include the per > Thread assignment? > > Thanks, > Damian > > > > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <fhussonn...@gmail.com> > wrote: > > > Hi Matthias, > > > > First, I will answer to your last question. > > > > The main reason to have both TaskState#assignment and > > TaskState#consumedOffsetsByPartition is that tasks have no consumed > offsets > > until at least one message is consumed for each partition even if > previous > > offsets exist for the consumer group. > > So yes this methods are redundant as it only diverge at application > > startup. > > > > About the use case, currently we are developping for a customer a little > > framework based on KafkaStreams which transform/denormalize data before > > ingesting into hadoop. > > > > We have a cluster of workers (SpringBoot) which instantiate KStreams > > topologies dynamicaly based on dataflow configurations. > > Each configuration describes a topic to consume and how to process > messages > > (this looks like NiFi processors API). > > > > Our architecture is inspired from KafkaConnect. We have topics for > configs > > and states which are consumed by each workers (actually we have reused > some > > internals classes to the connect API). > > > > Now, we would like to develop UIs to visualize topics and partitions > > consumed by our worker applications. > > > > Also, I think it would be nice to be able, in the futur, to develop web > > UIs similar to Spark but for KafkaStreams to visualize DAGs...so maybe > this > > KIP is just a first step. > > > > Thanks, > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <matth...@confluent.io>: > > > > > Thanks for the KIP. > > > > > > I am wondering a little bit, why you need to expose this information. > > > Can you describe some use cases? > > > > > > Would it be worth to unify this new API with KafkaStreams#state() to > get > > > the overall state of an application without the need to call two > > > different methods? Not sure how this unified API might look like > though. > > > > > > > > > One minor comment about the API: TaskState#assignment seems to be > > > redundant. It should be the same as > > > TaskState#consumedOffsetsByPartition.keySet() > > > > > > Or do I miss something? > > > > > > > > > -Matthias > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote: > > > > Hi Eno, > > > > > > > > Yes, but the state() method only returns the global state of the > > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING, > > > > PENDING_SHUTDOWN, NOT_RUNNING). > > > > > > > > An alternative to this KIP would be to change this method to return > > more > > > > information instead of adding a new method. > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <eno.there...@gmail.com>: > > > > > > > >> Thanks Florian, > > > >> > > > >> Have you had a chance to look at the new state methods in 0.10.2, > > e.g., > > > >> KafkaStreams.state()? > > > >> > > > >> Thanks > > > >> Eno > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <fhussonn...@gmail.com > > > > > >> wrote: > > > >>> > > > >>> Hi all, > > > >>> > > > >>> I have just created KIP-130 to add a new method to the KafkaStreams > > API > > > >> in > > > >>> order to expose the states of threads and active tasks. > > > >>> > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API > > > >>> > > > >>> > > > >>> Thanks, > > > >>> > > > >>> -- > > > >>> Florian HUSSONNOIS > > > >> > > > >> > > > > > > > > > > > > > > > > > > > > -- > > Florian HUSSONNOIS > > > -- -- Guozhang