Hello Florian,

Thanks for the KIP and your detailed explanation of your use case. I think
there are two dimensions to discuss on how to improve Streams'
debuggability (or more specifically state exposure for visualization).

First question is "what information should we expose to the user". From
your KIP I saw generally three categories:

1. The state of the thread within a process, as you mentioned currently we
only expose the state of the process but not the finer grained per-thread
state.
2. The state of the task. Currently the most close API to this is
StreamsMetadata,
however it aggregates the tasks across all threads and only present the
aggregated set of the assigned partitions / state stores etc. We can
consider extending this method to have a new StreamsMetadata#tasks() which
returns a TaskMetadata with the similar fields, and the
StreamsMetadata.stateStoreNames / etc would still be returning the
aggregated results but users can still "drill down" if they want.

The second question is "how should we expose them to the user". For
example, you mentioned about consumedOffsetsByPartition in the activeTasks.
We could add this as a JMX metric based on fetch positions inside the
consumer layer (note that Streams is just embedding consumers) or we could
consider adding it into TaskMetadata. Either case it can be visualized for
monitoring. The reason we expose StreamsMetadata as well as State was that
it is expected to be "polled" in a programmatic way for interactive queries
and also for control flows (e.g. I would like to ONLY start running my
other topology until the first topology has been up and running) while for
your case it seems the main purpose is to continuously query them for
monitoring etc. Personally I'd prefer to expose them as JMX only for such
purposes only to have a simpler API.

So given your current motivations I'd suggest expose the following
information as newly added metrics in Streams:

1. Thread-level state metric.
2. Task-level hosted client identifier metric (e.g. host:port).
3. Consumer-level per-topic/partition position metric (
https://kafka.apache.org/documentation/#topic_fetch_monitoring).

Note that the task-level assignment information is static, i.e. it will not
change during the runtime at all and can be accessed from the `toString()`
function already even before the instance start running, so I think this
piece of information do not need to be exposed through JMX anymore.

WDYT?

Guozhang


On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy <damian....@gmail.com> wrote:

> Hi Florian,
>
> Thanks for the KIP.
>
> It seems there is some overlap here with what we already have in
> KafkaStreams.allMetadata(). This currently returns a
> Collection<StreamsMetadata> where each StreamsMetadata instance holds the
> state stores and partition assignment for every instance of the
> KafkaStreams application. I'm wondering if that is good enough for what you
> are trying to achieve? If not could it be modified to include the per
> Thread assignment?
>
> Thanks,
> Damian
>
>
>
>
>
>
> On Wed, 1 Mar 2017 at 22:49 Florian Hussonnois <fhussonn...@gmail.com>
> wrote:
>
> > Hi Matthias,
> >
> > First, I will answer to your last question.
> >
> > The main reason to have both TaskState#assignment and
> > TaskState#consumedOffsetsByPartition is that tasks have no consumed
> offsets
> > until at least one message is consumed for each partition even if
> previous
> > offsets exist for the consumer group.
> > So yes this methods are redundant as it only diverge at application
> > startup.
> >
> > About the use case, currently we are developping for a customer a little
> > framework based on KafkaStreams which transform/denormalize data before
> > ingesting into hadoop.
> >
> > We have a cluster of workers (SpringBoot) which instantiate KStreams
> > topologies dynamicaly based on dataflow configurations.
> > Each configuration describes a topic to consume and how to process
> messages
> > (this looks like NiFi processors API).
> >
> > Our architecture is inspired from KafkaConnect. We have topics for
> configs
> > and states which are consumed by each workers (actually we have reused
> some
> > internals classes to the connect API).
> >
> > Now, we would like to develop UIs to visualize topics and partitions
> > consumed by our worker applications.
> >
> > Also, I think it would be nice to be able,  in the futur, to develop web
> > UIs similar to Spark but for KafkaStreams to visualize DAGs...so maybe
> this
> > KIP is just a first step.
> >
> > Thanks,
> >
> > 2017-03-01 22:52 GMT+01:00 Matthias J. Sax <matth...@confluent.io>:
> >
> > > Thanks for the KIP.
> > >
> > > I am wondering a little bit, why you need to expose this information.
> > > Can you describe some use cases?
> > >
> > > Would it be worth to unify this new API with KafkaStreams#state() to
> get
> > > the overall state of an application without the need to call two
> > > different methods? Not sure how this unified API might look like
> though.
> > >
> > >
> > > One minor comment about the API: TaskState#assignment seems to be
> > > redundant. It should be the same as
> > > TaskState#consumedOffsetsByPartition.keySet()
> > >
> > > Or do I miss something?
> > >
> > >
> > > -Matthias
> > >
> > > On 3/1/17 5:19 AM, Florian Hussonnois wrote:
> > > > Hi Eno,
> > > >
> > > > Yes, but the state() method only returns the global state of the
> > > > KafkaStream application (ie: CREATED, RUNNING, REBALANCING,
> > > > PENDING_SHUTDOWN, NOT_RUNNING).
> > > >
> > > > An alternative to this KIP would be to change this method to return
> > more
> > > > information instead of adding a new method.
> > > >
> > > > 2017-03-01 13:46 GMT+01:00 Eno Thereska <eno.there...@gmail.com>:
> > > >
> > > >> Thanks Florian,
> > > >>
> > > >> Have you had a chance to look at the new state methods in 0.10.2,
> > e.g.,
> > > >> KafkaStreams.state()?
> > > >>
> > > >> Thanks
> > > >> Eno
> > > >>> On 1 Mar 2017, at 11:54, Florian Hussonnois <fhussonn...@gmail.com
> >
> > > >> wrote:
> > > >>>
> > > >>> Hi all,
> > > >>>
> > > >>> I have just created KIP-130 to add a new method to the KafkaStreams
> > API
> > > >> in
> > > >>> order to expose the states of threads and active tasks.
> > > >>>
> > > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> > > >> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> > > >>>
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> --
> > > >>> Florian HUSSONNOIS
> > > >>
> > > >>
> > > >
> > > >
> > >
> > >
> >
> >
> > --
> > Florian HUSSONNOIS
> >
>



-- 
-- Guozhang

Reply via email to