Hi Guozhang

Thank you for your feedback. I've started to look more deeply into the
code. As you mention, it would be more clever to use the current
StreamMetadata API to expose these information.

I think exposing metrics through JMX is great for building monitoring
dashboards using some tools like jmxtrans and grafana.
But for our use case we would like to expose the states directely from the
application embedding the kstreams topologies.
So we expect to be able to retrieve states in a programmatic way.

For instance, we could imagin to produce those states into a dedicated
topic. In that way a third application could automatically discover all
kafka-streams applications which could be monitored.
In production environment, that can be clearly a solution to have a
complete overview of a microservices architecture based on Kafka Streams.

The toString() method give a lots of information it can only be used for
debugging purpose but not to build a topologies visualization tool. We
could actually expose same details about the stream topology from the
StreamMetadata API ? So the TaskMetadata class you have suggested could
contains similar information that ones return by the toString method from
AbstractTask class ?

I can update the KIP in that way.

Finally,  I'm not sure to understand your last point :* "Note that the
task-level assignment information is static, i.e. it will not change during
the runtime" *

Does that mean when a rebalance occurs new tasks are created for the new
assignments and old ones just switch to a standby state ?

Thanks,

2017-03-05 7:04 GMT+01:00 Guozhang Wang <wangg...@gmail.com>:

> Hello Florian,
>
> Thanks for the KIP and your detailed explanation of your use case. I think
> there are two dimensions to discuss on how to improve Streams'
> debuggability (or more specifically state exposure for visualization).
>
> First question is "what information should we expose to the user". From
> your KIP I saw generally three categories:
>
> 1. The state of the thread within a process, as you mentioned currently we
> only expose the state of the process but not the finer grained per-thread
> state.
> 2. The state of the task. Currently the most close API to this is
> StreamsMetadata,
> however it aggregates the tasks across all threads and only present the
> aggregated set of the assigned partitions / state stores etc. We can
> consider extending this method to have a new StreamsMetadata#tasks() which
> returns a TaskMetadata with the similar fields, and the
> StreamsMetadata.stateStoreNames / etc would still be returning the
> aggregated results but users can still "drill down" if they want.
>
> The second question is "how should we expose them to the user". For
> example, you mentioned about consumedOffsetsByPartition in the activeTasks.
> We could add this as a JMX metric based on fetch positions inside the
> consumer layer (note that Streams is just embedding consumers) or we could
> consider adding it into TaskMetadata. Either case it can be visualized for
> monitoring. The reason we expose StreamsMetadata as well as State was that
> it is expected to be "polled" in a programmatic way for interactive queries
> and also for control flows (e.g. I would like to ONLY start running my
> other topology until the first topology has been up and running) while for
> your case it seems the main purpose is to continuously query them for
> monitoring etc. Personally I'd prefer to expose them as JMX only for such
> purposes only to have a simpler API.
>
> So given your current motivations I'd suggest expose the following
> information as newly added metrics in Streams:
>
> 1. Thread-level state metric.
> 2. Task-level hosted client identifier metric (e.g. host:port).
> 3. Consumer-level per-topic/partition position metric (
> https://kafka.apache.org/documentation/#topic_fetch_monitoring).
>
> Note that the task-level assignment information is static, i.e. it will not
> change during the runtime at all and can be accessed from the `toString()`
> function already even before the instance start running, so I think this
> piece of information do not need to be exposed through JMX anymore.
>
> WDYT?
>
> Guozhang
>
>
> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com> wrote:
>
> > Hi Florian,
> >
> > Thanks for the KIP.
> >
> > It seems there is some overlap here with what we already have in
> > KafkaStreams.allMetadata(). This currently returns a
> > Collection<StreamsMetadata> where each StreamsMetadata instance holds the
> > state stores and partition assignment for every instance of the
> > KafkaStreams application. I'm wondering if that is good enough for what
> you
> > are trying to achieve? If not could it be modified to include the per
> > Thread assignment?
> >
> > Thanks,
> > Damian
> >
> >
> >
> >
> >
> >
> > On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <fhussonn...@gmail.com>
> > wrote:
> >
> > > Hi Matthias,
> > >
> > > First, I will answer to your last question.
> > >
> > > The main reason to have both TaskState#assignment and
> > > TaskState#consumedOffsetsByPartition is that tasks have no consumed
> > offsets
> > > until at least one message is consumed for each partition even if
> > previous
> > > offsets exist for the consumer group.
> > > So yes this methods are redundant as it only diverge at application
> > > startup.
> > >
> > > About the use case, currently we are developping for a customer a
> little
> > > framework based on KafkaStreams which transform/denormalize data before
> > > ingesting into hadoop.
> > >
> > > We have a cluster of workers (SpringBoot) which instantiate KStreams
> > > topologies dynamicaly based on dataflow configurations.
> > > Each configuration describes a topic to consume and how to process
> > messages
> > > (this looks like NiFi processors API).
> > >
> > > Our architecture is inspired from KafkaConnect. We have topics for
> > configs
> > > and states which are consumed by each workers (actually we have reused
> > some
> > > internals classes to the connect API).
> > >
> > > Now, we would like to develop UIs to visualize topics and partitions
> > > consumed by our worker applications.
> > >
> > > Also, I think it would be nice to be able,  in the futur, to develop
> web
> > > UIs similar to Spark but for KafkaStreams to visualize DAGs...so maybe
> > this
> > > KIP is just a first step.
> > >
> > > Thanks,
> > >
> > > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> > >
> > > > Thanks for the KIP.
> > > >
> > > > I am wondering a little bit, why you need to expose this information.
> > > > Can you describe some use cases?
> > > >
> > > > Would it be worth to unify this new API with KafkaStreams#state() to
> > get
> > > > the overall state of an application without the need to call two
> > > > different methods? Not sure how this unified API might look like
> > though.
> > > >
> > > >
> > > > One minor comment about the API: TaskState#assignment seems to be
> > > > redundant. It should be the same as
> > > > TaskState#consumedOffsetsByPartition.keySet()
> > > >
> > > > Or do I miss something?
> > > >
> > > >
> > > > -Matthias
> > > >
> > > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
> > > > > Hi Eno,
> > > > >
> > > > > Yes, but the state() method only returns the global state of the
> > > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING,
> > > > > PENDING_SHUTDOWN, NOT_RUNNING).
> > > > >
> > > > > An alternative to this KIP would be to change this method to return
> > > more
> > > > > information instead of adding a new method.
> > > > >
> > > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <eno.there...@gmail.com>:
> > > > >
> > > > >> Thanks Florian,
> > > > >>
> > > > >> Have you had a chance to look at the new state methods in 0.10.2,
> > > e.g.,
> > > > >> KafkaStreams.state()?
> > > > >>
> > > > >> Thanks
> > > > >> Eno
> > > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <
> fhussonn...@gmail.com
> > >
> > > > >> wrote:
> > > > >>>
> > > > >>> Hi all,
> > > > >>>
> > > > >>> I have just created KIP-130 to add a new method to the
> KafkaStreams
> > > API
> > > > >> in
> > > > >>> order to expose the states of threads and active tasks.
> > > > >>>
> > > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> > > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> > > > >>>
> > > > >>>
> > > > >>> Thanks,
> > > > >>>
> > > > >>> --
> > > > >>> Florian HUSSONNOIS
> > > > >>
> > > > >>
> > > > >
> > > > >
> > > >
> > > >
> > >
> > >
> > > --
> > > Florian HUSSONNOIS
> > >
> >
>
>
>
> --
> -- Guozhang
>



-- 
Florian HUSSONNOIS

Reply via email to