It would not give the same information as the new API. Thus, it would be
inconsistent (and this would be really bad IMHO.)


I would really like to remove (ie, deprecate) it. It was a "hot fix" to
give some runtime information to the user. But with this KIP, we get a
proper first class API and thus .toString() is just outdated.

>>>> It's always preferable to have an implementation for toString method.

Not sure about this btw.

Furthermore, if anyone wants to print runtime information, we got
ThreadMetadata#toString() and TaskMetadata#toString().




-Matthias

On 4/20/17 9:36 AM, Eno Thereska wrote:
> Hi there,
> 
> Can't toString() just stay as is?
> 
> Thanks
> Eno
> 
>> On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> Florian,
>>
>> I am just wondering: if we keep .toString(), what should the
>> implementation look like?
>>
>>
>> -Matthias
>>
>> On 4/19/17 2:42 PM, Florian Hussonnois wrote:
>>> Hi Matthias,
>>>
>>> So sorry for the delay in replying to you. For now, I think we can keep
>>> KafkaStreams#toString() as it is.
>>> It's always preferable to have an implementation for toString method.
>>>
>>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io>:
>>>
>>>> Florian,
>>>>
>>>>>>> What about KafkaStreams#toString() method?
>>>>>>>
>>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
>>>> this
>>>>>>> KIP, is gets obsolete.
>>>>
>>>> Any thoughts about this? For me, this is the last open point to discuss
>>>> (or what should be reflected in the KIP in case you agree) before I can
>>>> put my vote on the VOTE thread do did start already.
>>>>
>>>> -Matthias
>>>>
>>>>
>>>> On 4/11/17 12:18 AM, Damian Guy wrote:
>>>>> Hi Florian,
>>>>>
>>>>> Thanks for the updates. The KIP is looking good.
>>>>>
>>>>> Cheers,
>>>>> Damian
>>>>>
>>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>>
>>>>>> What about KafkaStreams#toString() method?
>>>>>>
>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of this
>>>>>> KIP, is gets obsolete.
>>>>>>
>>>>>> If we do so, please update the KIP accordingly.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
>>>>>>> Thanks for updating the KIP!
>>>>>>>
>>>>>>> I think it's good as is -- I would not add anything more to
>>>> TaskMetadata.
>>>>>>>
>>>>>>> About subtopologies and tasks. We do have the concept of subtopologies
>>>>>>> already in KIP-120. It's only missing and ID that allow to link a
>>>>>>> subtopology to a task.
>>>>>>>
>>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
>>>>>>> should be sufficient. We can simply document in the JavaDocs how
>>>>>>> Subtopology and TaskMetadata can be linked to each other.
>>>>>>>
>>>>>>> I did update KIP-120 accordingly.
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I've updated the KIP and the PR to reflect your suggestions.
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>> https://github.com/apache/kafka/pull/2612
>>>>>>>>
>>>>>>>> Also, I've exposed property StreamThread#state as a string through the
>>>>>>>> new class ThreadMetadata.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>>
>>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com
>>>>>>>> <mailto:fhussonn...@gmail.com>>:
>>>>>>>>
>>>>>>>>    Hi Guozhang, Matthias,
>>>>>>>>
>>>>>>>>    It's a great idea to add sub topologies descriptions. This would
>>>>>>>>    help developers to better understand topology concept.
>>>>>>>>
>>>>>>>>    I agree that is not really user-friendly to check if
>>>>>>>>    `StreamsMetadata#streamThreads` is not returning null.
>>>>>>>>
>>>>>>>>    The method name localThreadsMetadata looks good. In addition, it's
>>>>>>>>    more simple to build ThreadMetadata instances from the
>>>> `StreamTask`
>>>>>>>>    class than from `StreamPartitionAssignor` class.
>>>>>>>>
>>>>>>>>    I will work on modifications. As I understand, I have to add the
>>>>>>>>    property subTopologyId property to the TaskMetadata class - Am I
>>>>>> right ?
>>>>>>>>
>>>>>>>>    Thanks,
>>>>>>>>
>>>>>>>>    2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com
>>>>>>>>    <mailto:wangg...@gmail.com>>:
>>>>>>>>
>>>>>>>>        Re 1): this is a good point. May be we can move
>>>>>>>>        `StreamsMetadata#streamThreads` as
>>>>>>>>        `KafkaStreams#localThreadsMetadata`?
>>>>>>>>
>>>>>>>>        3): this is a minor suggestion about function name of
>>>>>>>>        `assignedPartitions`, to `topicPartitions` to be consistent
>>>> with
>>>>>>>>        `StreamsMetadata`?
>>>>>>>>
>>>>>>>>
>>>>>>>>        Guozhang
>>>>>>>>
>>>>>>>>        On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
>>>>>>>>        <matth...@confluent.io <mailto:matth...@confluent.io>> wrote:
>>>>>>>>
>>>>>>>>            Thanks for the progress on this KIP. I think we are on the
>>>>>>>>            right path!
>>>>>>>>
>>>>>>>>            Couple of comments/questions:
>>>>>>>>
>>>>>>>>            (1) Why do we not consider the "rejected alternative" to
>>>> add
>>>>>>>>            the method
>>>>>>>>            to KafkaStreams? The comment on #streamThreads() says:
>>>>>>>>
>>>>>>>>            "Note this method will return <code>null</code> if called
>>>> on
>>>>>>>>            {@link
>>>>>>>>            StreamsMetadata} which represent a remote application."
>>>>>>>>
>>>>>>>>            Thus, if we cannot get any remote metadata, it seems not
>>>>>>>>            straight
>>>>>>>>            forward to not add it to KafkaStreams directly -- this
>>>> would
>>>>>>>>            avoid
>>>>>>>>            invalid calls and `null` return value in the first place.
>>>>>>>>
>>>>>>>>            I like the idea about exposing sub-topologies.:
>>>>>>>>
>>>>>>>>            (2a) I would recommend to rename `topicsGroupId` to
>>>>>>>>            `subTopologyId` :)
>>>>>>>>
>>>>>>>>            (2b) We could add this to KIP-120 already. However, I
>>>> would
>>>>>>>>            not just
>>>>>>>>            link both via name, but leverage KIP-120 directly, and
>>>> add a
>>>>>>>>            "Subtopology" member to the TaskMetadata class.
>>>>>>>>
>>>>>>>>
>>>>>>>>            Overall, I like the distinction of KIP-120 only exposing
>>>>>>>>            "static"
>>>>>>>>            information that can be determined before the topology
>>>> get's
>>>>>>>>            started,
>>>>>>>>            while this KIP allow to access runtime information.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>            -Matthias
>>>>>>>>
>>>>>>>>
>>>>>>>>            On 3/22/17 12:42 PM, Guozhang Wang wrote:
>>>>>>>>> Thanks for the updated KIP, and sorry for the late
>>>>>> replies!
>>>>>>>>>
>>>>>>>>> I think a little bit more about KIP-130, and I feel that
>>>>>>>>            if we are going
>>>>>>>>> to deprecate the `toString` function (it is not
>>>> explicitly
>>>>>>>>            said in the
>>>>>>>>> KIP, so I'm not sure if you plan to still keep the
>>>>>>>>> `KafkaStreams#toString` as is or are going to replace it
>>>>>>>>            with the
>>>>>>>>> proposed APIs) with the proposed ones, it may be okay.
>>>>>> More
>>>>>>>>> specifically, after both KIP-120 and KIP-130:
>>>>>>>>>
>>>>>>>>> 1. users can use `#describe` function to check the
>>>>>>>>            generated topology
>>>>>>>>> before calling `KafkaStreams#start`, which is static
>>>>>>>>            information.
>>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata
>>>> ->
>>>>>>>>            TaskMetadata`
>>>>>>>>> programmatically after called `KafkaStreams#start` to
>>>> get
>>>>>> the
>>>>>>>>> dynamically changeable information.
>>>>>>>>>
>>>>>>>>> One thing I'm still not sure though, is that in
>>>>>>>>            `TaskMetadata` we only
>>>>>>>>> have the TaskId and assigned partitions, whereas in
>>>>>>>>> "TopologyDescription" introduced in KIP-120, it will
>>>>>>>>            simply describe the
>>>>>>>>> whole topology possibly composed of multiple
>>>>>>>>            sub-topologies. So it is
>>>>>>>>> hard for users to tell which sub-topology is executed
>>>>>>>>            under which task
>>>>>>>>> on-the-fly.
>>>>>>>>>
>>>>>>>>> Hence I'm thinking if we can expose the
>>>> "sub-topology-id"
>>>>>>>>            (named as
>>>>>>>>> topicsGroupId internally) in
>>>>>>>>            TopologyDescription#Subtopology, and then
>>>>>>>>> from the task id which is essentially "sub-topology-id
>>>>>> DASH
>>>>>>>>> partition-group-id" users can make the link, though it
>>>> is
>>>>>>>>            still not that
>>>>>>>>> straight-forward.
>>>>>>>>>
>>>>>>>>> Thoughts?
>>>>>>>>>
>>>>>>>>> Guozhang
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
>>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
>>>>>>>>            <mailto:fhussonn...@gmail.com
>>>>>>>>            <mailto:fhussonn...@gmail.com>>> wrote:
>>>>>>>>>
>>>>>>>>>    Thanks Guozhang for pointing me to the KIP-120.
>>>>>>>>>
>>>>>>>>>    I've made some modifications to the KIP. I also
>>>>>> proposed a new PR
>>>>>>>>>    (there is
>>>>>>>>>    still some tests to make).
>>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>>            <
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>
>>>>>>>>>    <
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>>            <
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
>>>>>>>>
>>>>>>>>>
>>>>>>>>>    Exposing consumed offsets through JMX is sufficient
>>>>>> for debugging
>>>>>>>>>    purpose.
>>>>>>>>>    But I think this could be part to another JIRA as
>>>>>> there is no impact to
>>>>>>>>>    public API.
>>>>>>>>>
>>>>>>>>>    Thanks
>>>>>>>>>
>>>>>>>>>    2017-03-10 22:35 GMT+01:00 Guozhang Wang <
>>>>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>>>>    <mailto:wangg...@gmail.com <mailto:
>>>> wangg...@gmail.com
>>>>>>>>> :
>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Hello Florian,
>>>>>>>>>>
>>>>>>>>>> As for programmatically discover monitoring data
>>>> by
>>>>>>>>            piping metrics
>>>>>>>>>    into a
>>>>>>>>>> dedicated topic. I think you can actually use a
>>>>>>>>>    KafkaMetricsReporter which
>>>>>>>>>> pipes the polled metric values into a pre-defined
>>>>>>>>            topic (note that
>>>>>>>>>    in Kafka
>>>>>>>>>> the MetricsReporter is simply an interface and
>>>> users
>>>>>>>>            can build
>>>>>>>>>    their own
>>>>>>>>>> impl in addition to the JMXReporter), for example
>>>> :
>>>>>>>>>>
>>>>>>>>>> https://github.com/krux/kafka-metrics-reporter
>>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>
>>>>>>>>>    <https://github.com/krux/kafka-metrics-reporter
>>>>>>>>            <https://github.com/krux/kafka-metrics-reporter>>
>>>>>>>>>>
>>>>>>>>>> As for the "static task-level assignment", what I
>>>>>>>>            meant is that
>>>>>>>>>    the mapping
>>>>>>>>>> from source-topic-partitions -> tasks are static,
>>>>>>>>            via the
>>>>>>>>>> "PartitionGrouper", and a task won't switch from
>>>> an
>>>>>>>>            active task to a
>>>>>>>>>> standby task, it is actually that an active task
>>>>>>>>            could be
>>>>>>>>>    migrated, as a
>>>>>>>>>> whole along with all its assigned partitions, to
>>>>>>>>            another thread /
>>>>>>>>>    process
>>>>>>>>>> and a new standby task will be created on the host
>>>>>>>>            that this
>>>>>>>>>    active task is
>>>>>>>>>> migrating from. So for the SAME task, its
>>>>>> taskMetadata.
>>>>>>>>>> assignedPartitions()
>>>>>>>>>> will always return you the same partitions.
>>>>>>>>>>
>>>>>>>>>> As for the `toString` function that what we have
>>>>>>>>            today, I feel it
>>>>>>>>>    has some
>>>>>>>>>> correlations with KIP-120 so I'm trying to
>>>>>>>>            coordinate some
>>>>>>>>>    discussions here
>>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My
>>>>>>>>            understand is that:
>>>>>>>>>>
>>>>>>>>>> 1. In KIP-120, the `toString` function of
>>>>>>>>            `KafkaStreams` will be
>>>>>>>>>    removed
>>>>>>>>>> and instead the `Topology#describe` function will
>>>> be
>>>>>>>>            introduced
>>>>>>>>>    for users
>>>>>>>>>> to debug the topology BEFORE start running their
>>>>>>>>            instance with the
>>>>>>>>>> topology. And hence the description won't contain
>>>>>>>>            any task
>>>>>>>>>    information as
>>>>>>>>>> they are not formed yet.
>>>>>>>>>> 2. In KIP-130, we want to add the task-level
>>>>>>>>            information for
>>>>>>>>>    monitoring
>>>>>>>>>> purposes, which is not static and can only be
>>>>>>>>            captured AFTER the
>>>>>>>>>    instance
>>>>>>>>>> has started running. Again I'm wondering for
>>>> KIP-130
>>>>>>>>            alone if
>>>>>>>>>    adding those
>>>>>>>>>> metrics mentioned in my previous email would
>>>> suffice
>>>>>>>>            even for the
>>>>>>>>>    use case
>>>>>>>>>> that you have mentioned.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Guozhang
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
>>>>>>>>>    <fhussonn...@gmail.com <mailto:
>>>> fhussonn...@gmail.com>
>>>>>>>>            <mailto:fhussonn...@gmail.com <mailto:
>>>> fhussonn...@gmail.com
>>>>>>>>>
>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Guozhang
>>>>>>>>>>>
>>>>>>>>>>> Thank you for your feedback. I've started to
>>>> look
>>>>>>>>            more deeply
>>>>>>>>>    into the
>>>>>>>>>>> code. As you mention, it would be more clever to
>>>>>>>>            use the current
>>>>>>>>>>> StreamMetadata API to expose these information.
>>>>>>>>>>>
>>>>>>>>>>> I think exposing metrics through JMX is great
>>>> for
>>>>>>>>            building
>>>>>>>>>    monitoring
>>>>>>>>>>> dashboards using some tools like jmxtrans and
>>>>>> grafana.
>>>>>>>>>>> But for our use case we would like to expose the
>>>>>>>>            states
>>>>>>>>>    directely from
>>>>>>>>>> the
>>>>>>>>>>> application embedding the kstreams topologies.
>>>>>>>>>>> So we expect to be able to retrieve states in a
>>>>>>>>            programmatic way.
>>>>>>>>>>>
>>>>>>>>>>> For instance, we could imagin to produce those
>>>>>>>>            states into a
>>>>>>>>>    dedicated
>>>>>>>>>>> topic. In that way a third application could
>>>>>>>>            automatically
>>>>>>>>>    discover all
>>>>>>>>>>> kafka-streams applications which could be
>>>>>> monitored.
>>>>>>>>>>> In production environment, that can be clearly a
>>>>>>>>            solution to have a
>>>>>>>>>>> complete overview of a microservices
>>>> architecture
>>>>>>>>            based on Kafka
>>>>>>>>>    Streams.
>>>>>>>>>>>
>>>>>>>>>>> The toString() method give a lots of information
>>>>>>>>            it can only be
>>>>>>>>>    used for
>>>>>>>>>>> debugging purpose but not to build a topologies
>>>>>>>>            visualization
>>>>>>>>>    tool. We
>>>>>>>>>>> could actually expose same details about the
>>>>>>>>            stream topology
>>>>>>>>>    from the
>>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class
>>>> you
>>>>>>>>            have
>>>>>>>>>    suggested could
>>>>>>>>>>> contains similar information that ones return by
>>>>>>>>            the toString
>>>>>>>>>    method from
>>>>>>>>>>> AbstractTask class ?
>>>>>>>>>>>
>>>>>>>>>>> I can update the KIP in that way.
>>>>>>>>>>>
>>>>>>>>>>> Finally,  I'm not sure to understand your last
>>>>>>>>            point :* "Note
>>>>>>>>>    that the
>>>>>>>>>>> task-level assignment information is static,
>>>> i.e.
>>>>>>>>            it will not change
>>>>>>>>>> during
>>>>>>>>>>> the runtime" *
>>>>>>>>>>>
>>>>>>>>>>> Does that mean when a rebalance occurs new tasks
>>>>>>>>            are created for
>>>>>>>>>    the new
>>>>>>>>>>> assignments and old ones just switch to a
>>>> standby
>>>>>>>>            state ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang
>>>>>>>>            <wangg...@gmail.com <mailto:wangg...@gmail.com>
>>>>>>>>>    <mailto:wangg...@gmail.com <mailto:
>>>> wangg...@gmail.com
>>>>>>>>> :
>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Hello Florian,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the KIP and your detailed
>>>> explanation
>>>>>>>>            of your use
>>>>>>>>>    case. I
>>>>>>>>>>> think
>>>>>>>>>>>> there are two dimensions to discuss on how to
>>>>>>>>            improve Streams'
>>>>>>>>>>>> debuggability (or more specifically state
>>>>>>>>            exposure for
>>>>>>>>>    visualization).
>>>>>>>>>>>>
>>>>>>>>>>>> First question is "what information should we
>>>>>>>>            expose to the
>>>>>>>>>    user". From
>>>>>>>>>>>> your KIP I saw generally three categories:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. The state of the thread within a process,
>>>> as
>>>>>>>>            you mentioned
>>>>>>>>>    currently
>>>>>>>>>>> we
>>>>>>>>>>>> only expose the state of the process but not
>>>> the
>>>>>>>>            finer grained
>>>>>>>>>> per-thread
>>>>>>>>>>>> state.
>>>>>>>>>>>> 2. The state of the task. Currently the most
>>>>>>>>            close API to this is
>>>>>>>>>>>> StreamsMetadata,
>>>>>>>>>>>> however it aggregates the tasks across all
>>>>>>>>            threads and only
>>>>>>>>>    present the
>>>>>>>>>>>> aggregated set of the assigned partitions /
>>>>>>>>            state stores etc.
>>>>>>>>>    We can
>>>>>>>>>>>> consider extending this method to have a new
>>>>>>>>>    StreamsMetadata#tasks()
>>>>>>>>>>> which
>>>>>>>>>>>> returns a TaskMetadata with the similar
>>>> fields,
>>>>>>>>            and the
>>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would
>>>>>>>>            still be returning the
>>>>>>>>>>>> aggregated results but users can still "drill
>>>>>>>>            down" if they want.
>>>>>>>>>>>>
>>>>>>>>>>>> The second question is "how should we expose
>>>>>>>>            them to the
>>>>>>>>>    user". For
>>>>>>>>>>>> example, you mentioned about
>>>>>>>>            consumedOffsetsByPartition in the
>>>>>>>>>>> activeTasks.
>>>>>>>>>>>> We could add this as a JMX metric based on
>>>> fetch
>>>>>>>>            positions
>>>>>>>>>    inside the
>>>>>>>>>>>> consumer layer (note that Streams is just
>>>>>>>>            embedding consumers)
>>>>>>>>>    or we
>>>>>>>>>>> could
>>>>>>>>>>>> consider adding it into TaskMetadata. Either
>>>>>>>>            case it can be
>>>>>>>>>    visualized
>>>>>>>>>>> for
>>>>>>>>>>>> monitoring. The reason we expose
>>>> StreamsMetadata
>>>>>>>>            as well as
>>>>>>>>>    State was
>>>>>>>>>>> that
>>>>>>>>>>>> it is expected to be "polled" in a
>>>> programmatic
>>>>>>>>            way for
>>>>>>>>>    interactive
>>>>>>>>>>> queries
>>>>>>>>>>>> and also for control flows (e.g. I would like
>>>> to
>>>>>>>>            ONLY start
>>>>>>>>>    running my
>>>>>>>>>>>> other topology until the first topology has
>>>> been
>>>>>>>>            up and
>>>>>>>>>    running) while
>>>>>>>>>>> for
>>>>>>>>>>>> your case it seems the main purpose is to
>>>>>>>>            continuously query
>>>>>>>>>    them for
>>>>>>>>>>>> monitoring etc. Personally I'd prefer to
>>>> expose
>>>>>>>>            them as JMX
>>>>>>>>>    only for
>>>>>>>>>> such
>>>>>>>>>>>> purposes only to have a simpler API.
>>>>>>>>>>>>
>>>>>>>>>>>> So given your current motivations I'd suggest
>>>>>>>>            expose the following
>>>>>>>>>>>> information as newly added metrics in Streams:
>>>>>>>>>>>>
>>>>>>>>>>>> 1. Thread-level state metric.
>>>>>>>>>>>> 2. Task-level hosted client identifier metric
>>>>>>>>            (e.g. host:port).
>>>>>>>>>>>> 3. Consumer-level per-topic/partition position
>>>>>>>>            metric (
>>>>>>>>>>>>
>>>>>>>>
>>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>>>            <
>>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>
>>>>>>>>>
>>>>>>>>             <
>>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
>>>>>>>>            <
>>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring>>).
>>>>>>>>>>>>
>>>>>>>>>>>> Note that the task-level assignment
>>>> information
>>>>>> is static,
>>>>>>>>>    i.e. it will
>>>>>>>>>>> not
>>>>>>>>>>>> change during the runtime at all and can be
>>>>>> accessed from the
>>>>>>>>>>> `toString()`
>>>>>>>>>>>> function already even before the instance
>>>> start
>>>>>> running, so I
>>>>>>>>>    think
>>>>>>>>>> this
>>>>>>>>>>>> piece of information do not need to be exposed
>>>>>> through JMX
>>>>>>>>>    anymore.
>>>>>>>>>>>>
>>>>>>>>>>>> WDYT?
>>>>>>>>>>>>
>>>>>>>>>>>> Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
>>>>>>>>>    <damian....@gmail.com <mailto:damian....@gmail.com>
>>>>>>>>            <mailto:damian....@gmail.com <mailto:damian....@gmail.com
>>>>>>>>>
>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Florian,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It seems there is some overlap here with
>>>> what
>>>>>> we already have in
>>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently
>>>>>> returns a
>>>>>>>>>>>>> Collection<StreamsMetadata> where each
>>>>>> StreamsMetadata
>>>>>>>>>    instance holds
>>>>>>>>>>> the
>>>>>>>>>>>>> state stores and partition assignment for
>>>>>> every instance of the
>>>>>>>>>>>>> KafkaStreams application. I'm wondering if
>>>>>> that is good
>>>>>>>>>    enough for
>>>>>>>>>> what
>>>>>>>>>>>> you
>>>>>>>>>>>>> are trying to achieve? If not could it be
>>>>>> modified to
>>>>>>>>>    include the per
>>>>>>>>>>>>> Thread assignment?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Damian
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian
>>>>>> Hussonnois <
>>>>>>>>>> fhussonn...@gmail.com <mailto:
>>>> fhussonn...@gmail.com
>>>>>>>
>>>>>>>>            <mailto:fhussonn...@gmail.com <mailto:
>>>> fhussonn...@gmail.com
>>>>>>>>>
>>>>>>>>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Matthias,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> First, I will answer to your last
>>>> question.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The main reason to have both
>>>>>>>>            TaskState#assignment and
>>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is
>>>> that
>>>>>>>>            tasks have no
>>>>>>>>>> consumed
>>>>>>>>>>>>> offsets
>>>>>>>>>>>>>> until at least one message is consumed for
>>>>>>>>            each partition
>>>>>>>>>    even if
>>>>>>>>>>>>> previous
>>>>>>>>>>>>>> offsets exist for the consumer group.
>>>>>>>>>>>>>> So yes this methods are redundant as it
>>>> only
>>>>>>>>            diverge at
>>>>>>>>>    application
>>>>>>>>>>>>>> startup.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> About the use case, currently we are
>>>>>>>>            developping for a
>>>>>>>>>    customer a
>>>>>>>>>>>> little
>>>>>>>>>>>>>> framework based on KafkaStreams which
>>>>>>>>>    transform/denormalize data
>>>>>>>>>>> before
>>>>>>>>>>>>>> ingesting into hadoop.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We have a cluster of workers (SpringBoot)
>>>>>>>>            which instantiate
>>>>>>>>>> KStreams
>>>>>>>>>>>>>> topologies dynamicaly based on dataflow
>>>>>>>>            configurations.
>>>>>>>>>>>>>> Each configuration describes a topic to
>>>>>>>>            consume and how to
>>>>>>>>>    process
>>>>>>>>>>>>> messages
>>>>>>>>>>>>>> (this looks like NiFi processors API).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Our architecture is inspired from
>>>>>>>>            KafkaConnect. We have
>>>>>>>>>    topics for
>>>>>>>>>>>>> configs
>>>>>>>>>>>>>> and states which are consumed by each
>>>>>>>>            workers (actually we
>>>>>>>>>    have
>>>>>>>>>>> reused
>>>>>>>>>>>>> some
>>>>>>>>>>>>>> internals classes to the connect API).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Now, we would like to develop UIs to
>>>>>>>>            visualize topics and
>>>>>>>>>> partitions
>>>>>>>>>>>>>> consumed by our worker applications.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Also, I think it would be nice to be able,
>>>>>>>>            in the futur, to
>>>>>>>>>> develop
>>>>>>>>>>>> web
>>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams
>>>> to
>>>>>>>>            visualize
>>>>>>>>>    DAGs...so
>>>>>>>>>>> maybe
>>>>>>>>>>>>> this
>>>>>>>>>>>>>> KIP is just a first step.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
>>>>>>>>>    <matth...@confluent.io <mailto:
>>>> matth...@confluent.io>
>>>>>>>>            <mailto:matth...@confluent.io <mailto:
>>>> matth...@confluent.io
>>>>>>>>
>>>>>>>>
>>>>>>>>>>> :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for the KIP.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am wondering a little bit, why you
>>>> need
>>>>>>>>            to expose this
>>>>>>>>>>> information.
>>>>>>>>>>>>>>> Can you describe some use cases?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Would it be worth to unify this new API
>>>>>> with
>>>>>>>>>    KafkaStreams#state()
>>>>>>>>>>> to
>>>>>>>>>>>>> get
>>>>>>>>>>>>>>> the overall state of an application
>>>>>>>>            without the need to
>>>>>>>>>    call two
>>>>>>>>>>>>>>> different methods? Not sure how this
>>>>>>>>            unified API might
>>>>>>>>>    look like
>>>>>>>>>>>>> though.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> One minor comment about the API:
>>>>>>>>            TaskState#assignment
>>>>>>>>>    seems to be
>>>>>>>>>>>>>>> redundant. It should be the same as
>>>>>>>>>>>>>>>
>>>>>> TaskState#consumedOffsetsByPartition.keySet()
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Or do I miss something?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois
>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi Eno,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, but the state() method only
>>>> returns
>>>>>>>>            the global
>>>>>>>>>    state of
>>>>>>>>>> the
>>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED,
>>>>>>>>            RUNNING,
>>>>>>>>>    REBALANCING,
>>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> An alternative to this KIP would be to
>>>>>>>>            change this
>>>>>>>>>    method to
>>>>>>>>>>> return
>>>>>>>>>>>>>> more
>>>>>>>>>>>>>>>> information instead of adding a new
>>>>>> method.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno
>>>> Thereska
>>>>>> <
>>>>>>>>>> eno.there...@gmail.com
>>>>>>>>            <mailto:eno.there...@gmail.com>
>>>>>>>>            <mailto:eno.there...@gmail.com <mailto:
>>>>>> eno.there...@gmail.com>>
>>>>>>>>>>>> :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks Florian,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Have you had a chance to look at the
>>>>>> new state methods in
>>>>>>>>>>> 0.10.2,
>>>>>>>>>>>>>> e.g.,
>>>>>>>>>>>>>>>>> KafkaStreams.state()?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian
>>>>>> Hussonnois <
>>>>>>>>>>>> fhussonn...@gmail.com
>>>>>>>>            <mailto:fhussonn...@gmail.com> <mailto:
>>>>>> fhussonn...@gmail.com
>>>>>>>>            <mailto:fhussonn...@gmail.com>>
>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a
>>>>>>>>            new method to the
>>>>>>>>>>>> KafkaStreams
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> order to expose the states of
>>>> threads
>>>>>>>>            and active tasks.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>>>            <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
>>>>>>>>>
>>>>>>>>             <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
>>>>>>>>            <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>>
>>>>>>>>>>>>>>>>>
>>>>>>>>            130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
>>>>>>>>>> public+API
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> -- Guozhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Florian HUSSONNOIS
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> -- Guozhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    --
>>>>>>>>>    Florian HUSSONNOIS
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> -- Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>        --
>>>>>>>>        -- Guozhang
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    --
>>>>>>>>    Florian HUSSONNOIS
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Florian HUSSONNOIS
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to