Hi there,

Can't toString() just stay as is?

Thanks
Eno

> On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io> wrote:
> 
> Florian,
> 
> I am just wondering: if we keep .toString(), what should the
> implementation look like?
> 
> 
> -Matthias
> 
> On 4/19/17 2:42 PM, Florian Hussonnois wrote:
>> Hi Matthias,
>> 
>> So sorry for the delay in replying to you. For now, I think we can keep
>> KafkaStreams#toString() as it is.
>> It's always preferable to have an implementation for toString method.
>> 
>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
>> 
>>> Florian,
>>> 
>>>>>> What about KafkaStreams#toString() method?
>>>>>> 
>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
>>> this
>>>>>> KIP, is gets obsolete.
>>> 
>>> Any thoughts about this? For me, this is the last open point to discuss
>>> (or what should be reflected in the KIP in case you agree) before I can
>>> put my vote on the VOTE thread do did start already.
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 4/11/17 12:18 AM, Damian Guy wrote:
>>>> Hi Florian,
>>>> 
>>>> Thanks for the updates. The KIP is looking good.
>>>> 
>>>> Cheers,
>>>> Damian
>>>> 
>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io>
>>> wrote:
>>>> 
>>>>> What about KafkaStreams#toString() method?
>>>>> 
>>>>> I think, we want to deprecate it as with KIP-120 and the changes of this
>>>>> KIP, is gets obsolete.
>>>>> 
>>>>> If we do so, please update the KIP accordingly.
>>>>> 
>>>>> 
>>>>> -Matthias
>>>>> 
>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
>>>>>> Thanks for updating the KIP!
>>>>>> 
>>>>>> I think it's good as is -- I would not add anything more to
>>> TaskMetadata.
>>>>>> 
>>>>>> About subtopologies and tasks. We do have the concept of subtopologies
>>>>>> already in KIP-120. It's only missing and ID that allow to link a
>>>>>> subtopology to a task.
>>>>>> 
>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
>>>>>> should be sufficient. We can simply document in the JavaDocs how
>>>>>> Subtopology and TaskMetadata can be linked to each other.
>>>>>> 
>>>>>> I did update KIP-120 accordingly.
>>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
>>>>>>> Hi all,
>>>>>>> 
>>>>>>> I've updated the KIP and the PR to reflect your suggestions.
>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>> https://github.com/apache/kafka/pull/2612
>>>>>>> 
>>>>>>> Also, I've exposed property StreamThread#state as a string through the
>>>>>>> new class ThreadMetadata.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com
>>>>>>> <mailto:fhussonn...@gmail.com>>:
>>>>>>> 
>>>>>>>    Hi Guozhang, Matthias,
>>>>>>> 
>>>>>>>    It's a great idea to add sub topologies descriptions. This would
>>>>>>>    help developers to better understand topology concept.
>>>>>>> 
>>>>>>>    I agree that is not really user-friendly to check if
>>>>>>>    `StreamsMetadata#streamThreads` is not returning null.
>>>>>>> 
>>>>>>>    The method name localThreadsMetadata looks good. In addition, it's
>>>>>>>    more simple to build ThreadMetadata instances from the
>>> `StreamTask`
>>>>>>>    class than from `StreamPartitionAssignor` class.
>>>>>>> 
>>>>>>>    I will work on modifications. As I understand, I have to add the
>>>>>>>    property subTopologyId property to the TaskMetadata class - Am I
>>>>> right ?
>>>>>>> 
>>>>>>>    Thanks,
>>>>>>> 
>>>>>>>    2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com
>>>>>>>    <mailto:wangg...@gmail.com>>:
>>>>>>> 
>>>>>>>        Re 1): this is a good point. May be we can move
>>>>>>>        `StreamsMetadata#streamThreads` as
>>>>>>>        `KafkaStreams#localThreadsMetadata`?
>>>>>>> 
>>>>>>>        3): this is a minor suggestion about function name of
>>>>>>>        `assignedPartitions`, to `topicPartitions` to be consistent
>>> with
>>>>>>>        `StreamsMetadata`?
>>>>>>> 
>>>>>>> 
>>>>>>>        Guozhang
>>>>>>> 
>>>>>>>        On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
>>>>>>>        <matth...@confluent.io <mailto:matth...@confluent.io>> wrote:
>>>>>>> 
>>>>>>>            Thanks for the progress on this KIP. I think we are on the
>>>>>>>            right path!
>>>>>>> 
>>>>>>>            Couple of comments/questions:
>>>>>>> 
>>>>>>>            (1) Why do we not consider the "rejected alternative" to
>>> add
>>>>>>>            the method
>>>>>>>            to KafkaStreams? The comment on #streamThreads() says:
>>>>>>> 
>>>>>>>            "Note this method will return <code>null</code> if called
>>> on
>>>>>>>            {@link
>>>>>>>            StreamsMetadata} which represent a remote application."
>>>>>>> 
>>>>>>>            Thus, if we cannot get any remote metadata, it seems not
>>>>>>>            straight
>>>>>>>            forward to not add it to KafkaStreams directly -- this
>>> would
>>>>>>>            avoid
>>>>>>>            invalid calls and `null` return value in the first place.
>>>>>>> 
>>>>>>>            I like the idea about exposing sub-topologies.:
>>>>>>> 
>>>>>>>            (2a) I would recommend to rename `topicsGroupId` to
>>>>>>>            `subTopologyId` :)
>>>>>>> 
>>>>>>>            (2b) We could add this to KIP-120 already. However, I
>>> would
>>>>>>>            not just
>>>>>>>            link both via name, but leverage KIP-120 directly, and
>>> add a
>>>>>>>            "Subtopology" member to the TaskMetadata class.
>>>>>>> 
>>>>>>> 
>>>>>>>            Overall, I like the distinction of KIP-120 only exposing
>>>>>>>            "static"
>>>>>>>            information that can be determined before the topology
>>> get's
>>>>>>>            started,
>>>>>>>            while this KIP allow to access runtime information.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>            -Matthias
>>>>>>> 
>>>>>>> 
>>>>>>>            On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>>>>>>> Thanks for the updated KIP, and sorry for the late
>>>>> replies!
>>>>>>>> 
>>>>>>>> I think a little bit more about KIP-130, and I feel that
>>>>>>>            if we are going
>>>>>>>> to deprecate the `toString` function (it is not
>>> explicitly
>>>>>>>            said in the
>>>>>>>> KIP, so I'm not sure if you plan to still keep the
>>>>>>>> `KafkaStreams#toString` as is or are going to replace it
>>>>>>>            with the
>>>>>>>> proposed APIs) with the proposed ones, it may be okay.
>>>>> More
>>>>>>>> specifically, after both KIP-120 and KIP-130:
>>>>>>>> 
>>>>>>>> 1. users can use `#describe` function to check the
>>>>>>>            generated topology
>>>>>>>> before calling `KafkaStreams#start`, which is static
>>>>>>>            information.
>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata
>>> ->
>>>>>>>            TaskMetadata`
>>>>>>>> programmatically after called `KafkaStreams#start` to
>>> get
>>>>> the
>>>>>>>> dynamically changeable information.
>>>>>>>> 
>>>>>>>> One thing I'm still not sure though, is that in
>>>>>>>            `TaskMetadata` we only
>>>>>>>> have the TaskId and assigned partitions, whereas in
>>>>>>>> "TopologyDescription" introduced in KIP-120, it will
>>>>>>>            simply describe the
>>>>>>>> whole topology possibly composed of multiple
>>>>>>>            sub-topologies. So it is
>>>>>>>> hard for users to tell which sub-topology is executed
>>>>>>>            under which task
>>>>>>>> on-the-fly.
>>>>>>>> 
>>>>>>>> Hence I'm thinking if we can expose the
>>> "sub-topology-id"
>>>>>>>            (named as
>>>>>>>> topicsGroupId internally) in
>>>>>>>            TopologyDescription#Subtopology, and then
>>>>>>>> from the task id which is essentially "sub-topology-id
>>>>> DASH
>>>>>>>> partition-group-id" users can make the link, though it
>>> is
>>>>>>>            still not that
>>>>>>>> straight-forward.
>>>>>>>> 
>>>>>>>> Thoughts?
>>>>>>>> 
>>>>>>>> Guozhang
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
>>>>>>>            <mailto:fhussonn...@gmail.com
>>>>>>>            <mailto:fhussonn...@gmail.com>>> wrote:
>>>>>>>> 
>>>>>>>>    Thanks Guozhang for pointing me to the KIP-120.
>>>>>>>> 
>>>>>>>>    I've made some modifications to the KIP. I also
>>>>> proposed a new PR
>>>>>>>>    (there is
>>>>>>>>    still some tests to make).
>>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>            <
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>> 
>>>>>>>>    <
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>            <
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>> 
>>>>>>>> 
>>>>>>>>    Exposing consumed offsets through JMX is sufficient
>>>>> for debugging
>>>>>>>>    purpose.
>>>>>>>>    But I think this could be part to another JIRA as
>>>>> there is no impact to
>>>>>>>>    public API.
>>>>>>>> 
>>>>>>>>    Thanks
>>>>>>>> 
>>>>>>>>    2017-03-10 22:35 GMT+01:00 Guozhang Wang <
>>>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>>>    <mailto:wangg...@gmail.com <mailto:
>>> wangg...@gmail.com
>>>>>>>> :
>>>>>>> 
>>>>>>>> 
>>>>>>>>> Hello Florian,
>>>>>>>>> 
>>>>>>>>> As for programmatically discover monitoring data
>>> by
>>>>>>>            piping metrics
>>>>>>>>    into a
>>>>>>>>> dedicated topic. I think you can actually use a
>>>>>>>>    KafkaMetricsReporter which
>>>>>>>>> pipes the polled metric values into a pre-defined
>>>>>>>            topic (note that
>>>>>>>>    in Kafka
>>>>>>>>> the MetricsReporter is simply an interface and
>>> users
>>>>>>>            can build
>>>>>>>>    their own
>>>>>>>>> impl in addition to the JMXReporter), for example
>>> :
>>>>>>>>> 
>>>>>>>>> https://github.com/krux/kafka-metrics-reporter
>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>
>>>>>>>>    <https://github.com/krux/kafka-metrics-reporter
>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>>
>>>>>>>>> 
>>>>>>>>> As for the "static task-level assignment", what I
>>>>>>>            meant is that
>>>>>>>>    the mapping
>>>>>>>>> from source-topic-partitions -> tasks are static,
>>>>>>>            via the
>>>>>>>>> "PartitionGrouper", and a task won't switch from
>>> an
>>>>>>>            active task to a
>>>>>>>>> standby task, it is actually that an active task
>>>>>>>            could be
>>>>>>>>    migrated, as a
>>>>>>>>> whole along with all its assigned partitions, to
>>>>>>>            another thread /
>>>>>>>>    process
>>>>>>>>> and a new standby task will be created on the host
>>>>>>>            that this
>>>>>>>>    active task is
>>>>>>>>> migrating from. So for the SAME task, its
>>>>> taskMetadata.
>>>>>>>>> assignedPartitions()
>>>>>>>>> will always return you the same partitions.
>>>>>>>>> 
>>>>>>>>> As for the `toString` function that what we have
>>>>>>>            today, I feel it
>>>>>>>>    has some
>>>>>>>>> correlations with KIP-120 so I'm trying to
>>>>>>>            coordinate some
>>>>>>>>    discussions here
>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My
>>>>>>>            understand is that:
>>>>>>>>> 
>>>>>>>>> 1. In KIP-120, the `toString` function of
>>>>>>>            `KafkaStreams` will be
>>>>>>>>    removed
>>>>>>>>> and instead the `Topology#describe` function will
>>> be
>>>>>>>            introduced
>>>>>>>>    for users
>>>>>>>>> to debug the topology BEFORE start running their
>>>>>>>            instance with the
>>>>>>>>> topology. And hence the description won't contain
>>>>>>>            any task
>>>>>>>>    information as
>>>>>>>>> they are not formed yet.
>>>>>>>>> 2. In KIP-130, we want to add the task-level
>>>>>>>            information for
>>>>>>>>    monitoring
>>>>>>>>> purposes, which is not static and can only be
>>>>>>>            captured AFTER the
>>>>>>>>    instance
>>>>>>>>> has started running. Again I'm wondering for
>>> KIP-130
>>>>>>>            alone if
>>>>>>>>    adding those
>>>>>>>>> metrics mentioned in my previous email would
>>> suffice
>>>>>>>            even for the
>>>>>>>>    use case
>>>>>>>>> that you have mentioned.
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Guozhang
>>>>>>>>> 
>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
>>>>>>>>    <fhussonn...@gmail.com <mailto:
>>> fhussonn...@gmail.com>
>>>>>>>            <mailto:fhussonn...@gmail.com <mailto:
>>> fhussonn...@gmail.com
>>>>>>>> 
>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>> 
>>>>>>>>>> Hi Guozhang
>>>>>>>>>> 
>>>>>>>>>> Thank you for your feedback. I've started to
>>> look
>>>>>>>            more deeply
>>>>>>>>    into the
>>>>>>>>>> code. As you mention, it would be more clever to
>>>>>>>            use the current
>>>>>>>>>> StreamMetadata API to expose these information.
>>>>>>>>>> 
>>>>>>>>>> I think exposing metrics through JMX is great
>>> for
>>>>>>>            building
>>>>>>>>    monitoring
>>>>>>>>>> dashboards using some tools like jmxtrans and
>>>>> grafana.
>>>>>>>>>> But for our use case we would like to expose the
>>>>>>>            states
>>>>>>>>    directely from
>>>>>>>>> the
>>>>>>>>>> application embedding the kstreams topologies.
>>>>>>>>>> So we expect to be able to retrieve states in a
>>>>>>>            programmatic way.
>>>>>>>>>> 
>>>>>>>>>> For instance, we could imagin to produce those
>>>>>>>            states into a
>>>>>>>>    dedicated
>>>>>>>>>> topic. In that way a third application could
>>>>>>>            automatically
>>>>>>>>    discover all
>>>>>>>>>> kafka-streams applications which could be
>>>>> monitored.
>>>>>>>>>> In production environment, that can be clearly a
>>>>>>>            solution to have a
>>>>>>>>>> complete overview of a microservices
>>> architecture
>>>>>>>            based on Kafka
>>>>>>>>    Streams.
>>>>>>>>>> 
>>>>>>>>>> The toString() method give a lots of information
>>>>>>>            it can only be
>>>>>>>>    used for
>>>>>>>>>> debugging purpose but not to build a topologies
>>>>>>>            visualization
>>>>>>>>    tool. We
>>>>>>>>>> could actually expose same details about the
>>>>>>>            stream topology
>>>>>>>>    from the
>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class
>>> you
>>>>>>>            have
>>>>>>>>    suggested could
>>>>>>>>>> contains similar information that ones return by
>>>>>>>            the toString
>>>>>>>>    method from
>>>>>>>>>> AbstractTask class ?
>>>>>>>>>> 
>>>>>>>>>> I can update the KIP in that way.
>>>>>>>>>> 
>>>>>>>>>> Finally,  I'm not sure to understand your last
>>>>>>>            point :* "Note
>>>>>>>>    that the
>>>>>>>>>> task-level assignment information is static,
>>> i.e.
>>>>>>>            it will not change
>>>>>>>>> during
>>>>>>>>>> the runtime" *
>>>>>>>>>> 
>>>>>>>>>> Does that mean when a rebalance occurs new tasks
>>>>>>>            are created for
>>>>>>>>    the new
>>>>>>>>>> assignments and old ones just switch to a
>>> standby
>>>>>>>            state ?
>>>>>>>>>> 
>>>>>>>>>> Thanks,
>>>>>>>>>> 
>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang
>>>>>>>            <wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>>>    <mailto:wangg...@gmail.com <mailto:
>>> wangg...@gmail.com
>>>>>>>> :
>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>>> Hello Florian,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the KIP and your detailed
>>> explanation
>>>>>>>            of your use
>>>>>>>>    case. I
>>>>>>>>>> think
>>>>>>>>>>> there are two dimensions to discuss on how to
>>>>>>>            improve Streams'
>>>>>>>>>>> debuggability (or more specifically state
>>>>>>>            exposure for
>>>>>>>>    visualization).
>>>>>>>>>>> 
>>>>>>>>>>> First question is "what information should we
>>>>>>>            expose to the
>>>>>>>>    user". From
>>>>>>>>>>> your KIP I saw generally three categories:
>>>>>>>>>>> 
>>>>>>>>>>> 1. The state of the thread within a process,
>>> as
>>>>>>>            you mentioned
>>>>>>>>    currently
>>>>>>>>>> we
>>>>>>>>>>> only expose the state of the process but not
>>> the
>>>>>>>            finer grained
>>>>>>>>> per-thread
>>>>>>>>>>> state.
>>>>>>>>>>> 2. The state of the task. Currently the most
>>>>>>>            close API to this is
>>>>>>>>>>> StreamsMetadata,
>>>>>>>>>>> however it aggregates the tasks across all
>>>>>>>            threads and only
>>>>>>>>    present the
>>>>>>>>>>> aggregated set of the assigned partitions /
>>>>>>>            state stores etc.
>>>>>>>>    We can
>>>>>>>>>>> consider extending this method to have a new
>>>>>>>>    StreamsMetadata#tasks()
>>>>>>>>>> which
>>>>>>>>>>> returns a TaskMetadata with the similar
>>> fields,
>>>>>>>            and the
>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would
>>>>>>>            still be returning the
>>>>>>>>>>> aggregated results but users can still "drill
>>>>>>>            down" if they want.
>>>>>>>>>>> 
>>>>>>>>>>> The second question is "how should we expose
>>>>>>>            them to the
>>>>>>>>    user". For
>>>>>>>>>>> example, you mentioned about
>>>>>>>            consumedOffsetsByPartition in the
>>>>>>>>>> activeTasks.
>>>>>>>>>>> We could add this as a JMX metric based on
>>> fetch
>>>>>>>            positions
>>>>>>>>    inside the
>>>>>>>>>>> consumer layer (note that Streams is just
>>>>>>>            embedding consumers)
>>>>>>>>    or we
>>>>>>>>>> could
>>>>>>>>>>> consider adding it into TaskMetadata. Either
>>>>>>>            case it can be
>>>>>>>>    visualized
>>>>>>>>>> for
>>>>>>>>>>> monitoring. The reason we expose
>>> StreamsMetadata
>>>>>>>            as well as
>>>>>>>>    State was
>>>>>>>>>> that
>>>>>>>>>>> it is expected to be "polled" in a
>>> programmatic
>>>>>>>            way for
>>>>>>>>    interactive
>>>>>>>>>> queries
>>>>>>>>>>> and also for control flows (e.g. I would like
>>> to
>>>>>>>            ONLY start
>>>>>>>>    running my
>>>>>>>>>>> other topology until the first topology has
>>> been
>>>>>>>            up and
>>>>>>>>    running) while
>>>>>>>>>> for
>>>>>>>>>>> your case it seems the main purpose is to
>>>>>>>            continuously query
>>>>>>>>    them for
>>>>>>>>>>> monitoring etc. Personally I'd prefer to
>>> expose
>>>>>>>            them as JMX
>>>>>>>>    only for
>>>>>>>>> such
>>>>>>>>>>> purposes only to have a simpler API.
>>>>>>>>>>> 
>>>>>>>>>>> So given your current motivations I'd suggest
>>>>>>>            expose the following
>>>>>>>>>>> information as newly added metrics in Streams:
>>>>>>>>>>> 
>>>>>>>>>>> 1. Thread-level state metric.
>>>>>>>>>>> 2. Task-level hosted client identifier metric
>>>>>>>            (e.g. host:port).
>>>>>>>>>>> 3. Consumer-level per-topic/partition position
>>>>>>>            metric (
>>>>>>>>>>> 
>>>>>>> 
>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>>            <
>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>
>>>>>>>> 
>>>>>>>             <
>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>>            <
>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>).
>>>>>>>>>>> 
>>>>>>>>>>> Note that the task-level assignment
>>> information
>>>>> is static,
>>>>>>>>    i.e. it will
>>>>>>>>>> not
>>>>>>>>>>> change during the runtime at all and can be
>>>>> accessed from the
>>>>>>>>>> `toString()`
>>>>>>>>>>> function already even before the instance
>>> start
>>>>> running, so I
>>>>>>>>    think
>>>>>>>>> this
>>>>>>>>>>> piece of information do not need to be exposed
>>>>> through JMX
>>>>>>>>    anymore.
>>>>>>>>>>> 
>>>>>>>>>>> WDYT?
>>>>>>>>>>> 
>>>>>>>>>>> Guozhang
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
>>>>>>>>    <damian....@gmail.com <mailto:damian....@gmail.com>
>>>>>>>            <mailto:damian....@gmail.com <mailto:damian....@gmail.com
>>>>>>>> 
>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>> 
>>>>>>>>>>>> It seems there is some overlap here with
>>> what
>>>>> we already have in
>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently
>>>>> returns a
>>>>>>>>>>>> Collection<StreamsMetadata> where each
>>>>> StreamsMetadata
>>>>>>>>    instance holds
>>>>>>>>>> the
>>>>>>>>>>>> state stores and partition assignment for
>>>>> every instance of the
>>>>>>>>>>>> KafkaStreams application. I'm wondering if
>>>>> that is good
>>>>>>>>    enough for
>>>>>>>>> what
>>>>>>>>>>> you
>>>>>>>>>>>> are trying to achieve? If not could it be
>>>>> modified to
>>>>>>>>    include the per
>>>>>>>>>>>> Thread assignment?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Damian
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian
>>>>> Hussonnois <
>>>>>>>>> fhussonn...@gmail.com <mailto:
>>> fhussonn...@gmail.com
>>>>>> 
>>>>>>>            <mailto:fhussonn...@gmail.com <mailto:
>>> fhussonn...@gmail.com
>>>>>>>> 
>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> First, I will answer to your last
>>> question.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The main reason to have both
>>>>>>>            TaskState#assignment and
>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is
>>> that
>>>>>>>            tasks have no
>>>>>>>>> consumed
>>>>>>>>>>>> offsets
>>>>>>>>>>>>> until at least one message is consumed for
>>>>>>>            each partition
>>>>>>>>    even if
>>>>>>>>>>>> previous
>>>>>>>>>>>>> offsets exist for the consumer group.
>>>>>>>>>>>>> So yes this methods are redundant as it
>>> only
>>>>>>>            diverge at
>>>>>>>>    application
>>>>>>>>>>>>> startup.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> About the use case, currently we are
>>>>>>>            developping for a
>>>>>>>>    customer a
>>>>>>>>>>> little
>>>>>>>>>>>>> framework based on KafkaStreams which
>>>>>>>>    transform/denormalize data
>>>>>>>>>> before
>>>>>>>>>>>>> ingesting into hadoop.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> We have a cluster of workers (SpringBoot)
>>>>>>>            which instantiate
>>>>>>>>> KStreams
>>>>>>>>>>>>> topologies dynamicaly based on dataflow
>>>>>>>            configurations.
>>>>>>>>>>>>> Each configuration describes a topic to
>>>>>>>            consume and how to
>>>>>>>>    process
>>>>>>>>>>>> messages
>>>>>>>>>>>>> (this looks like NiFi processors API).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Our architecture is inspired from
>>>>>>>            KafkaConnect. We have
>>>>>>>>    topics for
>>>>>>>>>>>> configs
>>>>>>>>>>>>> and states which are consumed by each
>>>>>>>            workers (actually we
>>>>>>>>    have
>>>>>>>>>> reused
>>>>>>>>>>>> some
>>>>>>>>>>>>> internals classes to the connect API).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Now, we would like to develop UIs to
>>>>>>>            visualize topics and
>>>>>>>>> partitions
>>>>>>>>>>>>> consumed by our worker applications.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Also, I think it would be nice to be able,
>>>>>>>            in the futur, to
>>>>>>>>> develop
>>>>>>>>>>> web
>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams
>>> to
>>>>>>>            visualize
>>>>>>>>    DAGs...so
>>>>>>>>>> maybe
>>>>>>>>>>>> this
>>>>>>>>>>>>> KIP is just a first step.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
>>>>>>>>    <matth...@confluent.io <mailto:
>>> matth...@confluent.io>
>>>>>>>            <mailto:matth...@confluent.io <mailto:
>>> matth...@confluent.io
>>>>>>> 
>>>>>>> 
>>>>>>>>>> :
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I am wondering a little bit, why you
>>> need
>>>>>>>            to expose this
>>>>>>>>>> information.
>>>>>>>>>>>>>> Can you describe some use cases?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Would it be worth to unify this new API
>>>>> with
>>>>>>>>    KafkaStreams#state()
>>>>>>>>>> to
>>>>>>>>>>>> get
>>>>>>>>>>>>>> the overall state of an application
>>>>>>>            without the need to
>>>>>>>>    call two
>>>>>>>>>>>>>> different methods? Not sure how this
>>>>>>>            unified API might
>>>>>>>>    look like
>>>>>>>>>>>> though.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> One minor comment about the API:
>>>>>>>            TaskState#assignment
>>>>>>>>    seems to be
>>>>>>>>>>>>>> redundant. It should be the same as
>>>>>>>>>>>>>> 
>>>>> TaskState#consumedOffsetsByPartition.keySet()
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Or do I miss something?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois
>>>>> wrote:
>>>>>>>>>>>>>>> Hi Eno,
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Yes, but the state() method only
>>> returns
>>>>>>>            the global
>>>>>>>>    state of
>>>>>>>>> the
>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED,
>>>>>>>            RUNNING,
>>>>>>>>    REBALANCING,
>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING).
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> An alternative to this KIP would be to
>>>>>>>            change this
>>>>>>>>    method to
>>>>>>>>>> return
>>>>>>>>>>>>> more
>>>>>>>>>>>>>>> information instead of adding a new
>>>>> method.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno
>>> Thereska
>>>>> <
>>>>>>>>> eno.there...@gmail.com
>>>>>>>            <mailto:eno.there...@gmail.com>
>>>>>>>            <mailto:eno.there...@gmail.com <mailto:
>>>>> eno.there...@gmail.com>>
>>>>>>>>>>> :
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks Florian,
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Have you had a chance to look at the
>>>>> new state methods in
>>>>>>>>>> 0.10.2,
>>>>>>>>>>>>> e.g.,
>>>>>>>>>>>>>>>> KafkaStreams.state()?
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian
>>>>> Hussonnois <
>>>>>>>>>>> fhussonn...@gmail.com
>>>>>>>            <mailto:fhussonn...@gmail.com> <mailto:
>>>>> fhussonn...@gmail.com
>>>>>>>            <mailto:fhussonn...@gmail.com>>
>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a
>>>>>>>            new method to the
>>>>>>>>>>> KafkaStreams
>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>> order to expose the states of
>>> threads
>>>>>>>            and active tasks.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>> 
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>>            <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
>>>>>>>> 
>>>>>>>             <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>>            <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>>
>>>>>>>>>>>>>>>> 
>>>>>>>            130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
>>>>>>>>> public+API
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> -- Guozhang
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> --
>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>>    --
>>>>>>>>    Florian HUSSONNOIS
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> --
>>>>>>>> -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>        --
>>>>>>>        -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>>    --
>>>>>>>    Florian HUSSONNOIS
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Florian HUSSONNOIS
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 

Reply via email to