Thanks Guozhang for pointing me to the KIP-120. I've made some modifications to the KIP. I also proposed a new PR (there is still some tests to make). https://cwiki.apache.org/confluence/display/KAFKA/KIP+130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
Exposing consumed offsets through JMX is sufficient for debugging purpose. But I think this could be part to another JIRA as there is no impact to public API. Thanks 2017-03-10 22:35 GMT+01:00 Guozhang Wang <wangg...@gmail.com>: > Hello Florian, > > As for programmatically discover monitoring data by piping metrics into a > dedicated topic. I think you can actually use a KafkaMetricsReporter which > pipes the polled metric values into a pre-defined topic (note that in Kafka > the MetricsReporter is simply an interface and users can build their own > impl in addition to the JMXReporter), for example : > > https://github.com/krux/kafka-metrics-reporter > > As for the "static task-level assignment", what I meant is that the mapping > from source-topic-partitions -> tasks are static, via the > "PartitionGrouper", and a task won't switch from an active task to a > standby task, it is actually that an active task could be migrated, as a > whole along with all its assigned partitions, to another thread / process > and a new standby task will be created on the host that this active task is > migrating from. So for the SAME task, its taskMetadata. > assignedPartitions() > will always return you the same partitions. > > As for the `toString` function that what we have today, I feel it has some > correlations with KIP-120 so I'm trying to coordinate some discussions here > (cc'ing Matthias as the owner of KIP-120). My understand is that: > > 1. In KIP-120, the `toString` function of `KafkaStreams` will be removed > and instead the `Topology#describe` function will be introduced for users > to debug the topology BEFORE start running their instance with the > topology. And hence the description won't contain any task information as > they are not formed yet. > 2. In KIP-130, we want to add the task-level information for monitoring > purposes, which is not static and can only be captured AFTER the instance > has started running. Again I'm wondering for KIP-130 alone if adding those > metrics mentioned in my previous email would suffice even for the use case > that you have mentioned. > > > Guozhang > > On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois <fhussonn...@gmail.com> > wrote: > > > Hi Guozhang > > > > Thank you for your feedback. I've started to look more deeply into the > > code. As you mention, it would be more clever to use the current > > StreamMetadata API to expose these information. > > > > I think exposing metrics through JMX is great for building monitoring > > dashboards using some tools like jmxtrans and grafana. > > But for our use case we would like to expose the states directely from > the > > application embedding the kstreams topologies. > > So we expect to be able to retrieve states in a programmatic way. > > > > For instance, we could imagin to produce those states into a dedicated > > topic. In that way a third application could automatically discover all > > kafka-streams applications which could be monitored. > > In production environment, that can be clearly a solution to have a > > complete overview of a microservices architecture based on Kafka Streams. > > > > The toString() method give a lots of information it can only be used for > > debugging purpose but not to build a topologies visualization tool. We > > could actually expose same details about the stream topology from the > > StreamMetadata API ? So the TaskMetadata class you have suggested could > > contains similar information that ones return by the toString method from > > AbstractTask class ? > > > > I can update the KIP in that way. > > > > Finally, I'm not sure to understand your last point :* "Note that the > > task-level assignment information is static, i.e. it will not change > during > > the runtime" * > > > > Does that mean when a rebalance occurs new tasks are created for the new > > assignments and old ones just switch to a standby state ? > > > > Thanks, > > > > 2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com>: > > > > > Hello Florian, > > > > > > Thanks for the KIP and your detailed explanation of your use case. I > > think > > > there are two dimensions to discuss on how to improve Streams' > > > debuggability (or more specifically state exposure for visualization). > > > > > > First question is "what information should we expose to the user". From > > > your KIP I saw generally three categories: > > > > > > 1. The state of the thread within a process, as you mentioned currently > > we > > > only expose the state of the process but not the finer grained > per-thread > > > state. > > > 2. The state of the task. Currently the most close API to this is > > > StreamsMetadata, > > > however it aggregates the tasks across all threads and only present the > > > aggregated set of the assigned partitions / state stores etc. We can > > > consider extending this method to have a new StreamsMetadata#tasks() > > which > > > returns a TaskMetadata with the similar fields, and the > > > StreamsMetadata.stateStoreNames / etc would still be returning the > > > aggregated results but users can still "drill down" if they want. > > > > > > The second question is "how should we expose them to the user". For > > > example, you mentioned about consumedOffsetsByPartition in the > > activeTasks. > > > We could add this as a JMX metric based on fetch positions inside the > > > consumer layer (note that Streams is just embedding consumers) or we > > could > > > consider adding it into TaskMetadata. Either case it can be visualized > > for > > > monitoring. The reason we expose StreamsMetadata as well as State was > > that > > > it is expected to be "polled" in a programmatic way for interactive > > queries > > > and also for control flows (e.g. I would like to ONLY start running my > > > other topology until the first topology has been up and running) while > > for > > > your case it seems the main purpose is to continuously query them for > > > monitoring etc. Personally I'd prefer to expose them as JMX only for > such > > > purposes only to have a simpler API. > > > > > > So given your current motivations I'd suggest expose the following > > > information as newly added metrics in Streams: > > > > > > 1. Thread-level state metric. > > > 2. Task-level hosted client identifier metric (e.g. host:port). > > > 3. Consumer-level per-topic/partition position metric ( > > > https://kafka.apache.org/documentation/#topic_fetch_monitoring). > > > > > > Note that the task-level assignment information is static, i.e. it will > > not > > > change during the runtime at all and can be accessed from the > > `toString()` > > > function already even before the instance start running, so I think > this > > > piece of information do not need to be exposed through JMX anymore. > > > > > > WDYT? > > > > > > Guozhang > > > > > > > > > On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com> > wrote: > > > > > > > Hi Florian, > > > > > > > > Thanks for the KIP. > > > > > > > > It seems there is some overlap here with what we already have in > > > > KafkaStreams.allMetadata(). This currently returns a > > > > Collection<StreamsMetadata> where each StreamsMetadata instance holds > > the > > > > state stores and partition assignment for every instance of the > > > > KafkaStreams application. I'm wondering if that is good enough for > what > > > you > > > > are trying to achieve? If not could it be modified to include the per > > > > Thread assignment? > > > > > > > > Thanks, > > > > Damian > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois < > fhussonn...@gmail.com> > > > > wrote: > > > > > > > > > Hi Matthias, > > > > > > > > > > First, I will answer to your last question. > > > > > > > > > > The main reason to have both TaskState#assignment and > > > > > TaskState#consumedOffsetsByPartition is that tasks have no > consumed > > > > offsets > > > > > until at least one message is consumed for each partition even if > > > > previous > > > > > offsets exist for the consumer group. > > > > > So yes this methods are redundant as it only diverge at application > > > > > startup. > > > > > > > > > > About the use case, currently we are developping for a customer a > > > little > > > > > framework based on KafkaStreams which transform/denormalize data > > before > > > > > ingesting into hadoop. > > > > > > > > > > We have a cluster of workers (SpringBoot) which instantiate > KStreams > > > > > topologies dynamicaly based on dataflow configurations. > > > > > Each configuration describes a topic to consume and how to process > > > > messages > > > > > (this looks like NiFi processors API). > > > > > > > > > > Our architecture is inspired from KafkaConnect. We have topics for > > > > configs > > > > > and states which are consumed by each workers (actually we have > > reused > > > > some > > > > > internals classes to the connect API). > > > > > > > > > > Now, we would like to develop UIs to visualize topics and > partitions > > > > > consumed by our worker applications. > > > > > > > > > > Also, I think it would be nice to be able, in the futur, to > develop > > > web > > > > > UIs similar to Spark but for KafkaStreams to visualize DAGs...so > > maybe > > > > this > > > > > KIP is just a first step. > > > > > > > > > > Thanks, > > > > > > > > > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <matth...@confluent.io > >: > > > > > > > > > > > Thanks for the KIP. > > > > > > > > > > > > I am wondering a little bit, why you need to expose this > > information. > > > > > > Can you describe some use cases? > > > > > > > > > > > > Would it be worth to unify this new API with KafkaStreams#state() > > to > > > > get > > > > > > the overall state of an application without the need to call two > > > > > > different methods? Not sure how this unified API might look like > > > > though. > > > > > > > > > > > > > > > > > > One minor comment about the API: TaskState#assignment seems to be > > > > > > redundant. It should be the same as > > > > > > TaskState#consumedOffsetsByPartition.keySet() > > > > > > > > > > > > Or do I miss something? > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote: > > > > > > > Hi Eno, > > > > > > > > > > > > > > Yes, but the state() method only returns the global state of > the > > > > > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING, > > > > > > > PENDING_SHUTDOWN, NOT_RUNNING). > > > > > > > > > > > > > > An alternative to this KIP would be to change this method to > > return > > > > > more > > > > > > > information instead of adding a new method. > > > > > > > > > > > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska < > eno.there...@gmail.com > > >: > > > > > > > > > > > > > >> Thanks Florian, > > > > > > >> > > > > > > >> Have you had a chance to look at the new state methods in > > 0.10.2, > > > > > e.g., > > > > > > >> KafkaStreams.state()? > > > > > > >> > > > > > > >> Thanks > > > > > > >> Eno > > > > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois < > > > fhussonn...@gmail.com > > > > > > > > > > > >> wrote: > > > > > > >>> > > > > > > >>> Hi all, > > > > > > >>> > > > > > > >>> I have just created KIP-130 to add a new method to the > > > KafkaStreams > > > > > API > > > > > > >> in > > > > > > >>> order to expose the states of threads and active tasks. > > > > > > >>> > > > > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+ > > > > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+ > public+API > > > > > > >>> > > > > > > >>> > > > > > > >>> Thanks, > > > > > > >>> > > > > > > >>> -- > > > > > > >>> Florian HUSSONNOIS > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Florian HUSSONNOIS > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > -- > > Florian HUSSONNOIS > > > > > > -- > -- Guozhang > -- Florian HUSSONNOIS