Ok, I’m convinced. Let’s depreciate.
Thanks
Eno
> On Apr 21, 2017, at 9:20 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> I'd agree with Matthias.
>
> @Eno do you have any specific use case in mind to better keep the `toString`
> function?
>
>
> Guozhang
>
>
> On Fri, Apr 21, 2017 at 11:41 AM, Matthias J. Sax <matth...@confluent.io
> <mailto:matth...@confluent.io>> wrote:
> It would not give the same information as the new API. Thus, it would be
> inconsistent (and this would be really bad IMHO.)
>
>
> I would really like to remove (ie, deprecate) it. It was a "hot fix" to
> give some runtime information to the user. But with this KIP, we get a
> proper first class API and thus .toString() is just outdated.
>
> >>>> It's always preferable to have an implementation for toString method.
>
> Not sure about this btw.
>
> Furthermore, if anyone wants to print runtime information, we got
> ThreadMetadata#toString() and TaskMetadata#toString().
>
>
>
>
> -Matthias
>
> On 4/20/17 9:36 AM, Eno Thereska wrote:
> > Hi there,
> >
> > Can't toString() just stay as is?
> >
> > Thanks
> > Eno
> >
> >> On 20 Apr 2017, at 17:26, Matthias J. Sax <matth...@confluent.io
> >> <mailto:matth...@confluent.io>> wrote:
> >>
> >> Florian,
> >>
> >> I am just wondering: if we keep .toString(), what should the
> >> implementation look like?
> >>
> >>
> >> -Matthias
> >>
> >> On 4/19/17 2:42 PM, Florian Hussonnois wrote:
> >>> Hi Matthias,
> >>>
> >>> So sorry for the delay in replying to you. For now, I think we can keep
> >>> KafkaStreams#toString() as it is.
> >>> It's always preferable to have an implementation for toString method.
> >>>
> >>> 2017-04-14 4:08 GMT+02:00 Matthias J. Sax <matth...@confluent.io
> >>> <mailto:matth...@confluent.io>>:
> >>>
> >>>> Florian,
> >>>>
> >>>>>>> What about KafkaStreams#toString() method?
> >>>>>>>
> >>>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
> >>>> this
> >>>>>>> KIP, is gets obsolete.
> >>>>
> >>>> Any thoughts about this? For me, this is the last open point to discuss
> >>>> (or what should be reflected in the KIP in case you agree) before I can
> >>>> put my vote on the VOTE thread do did start already.
> >>>>
> >>>> -Matthias
> >>>>
> >>>>
> >>>> On 4/11/17 12:18 AM, Damian Guy wrote:
> >>>>> Hi Florian,
> >>>>>
> >>>>> Thanks for the updates. The KIP is looking good.
> >>>>>
> >>>>> Cheers,
> >>>>> Damian
> >>>>>
> >>>>> On Fri, 7 Apr 2017 at 22:41 Matthias J. Sax <matth...@confluent.io
> >>>>> <mailto:matth...@confluent.io>>
> >>>> wrote:
> >>>>>
> >>>>>> What about KafkaStreams#toString() method?
> >>>>>>
> >>>>>> I think, we want to deprecate it as with KIP-120 and the changes of
> >>>>>> this
> >>>>>> KIP, is gets obsolete.
> >>>>>>
> >>>>>> If we do so, please update the KIP accordingly.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 3/28/17 7:00 PM, Matthias J. Sax wrote:
> >>>>>>> Thanks for updating the KIP!
> >>>>>>>
> >>>>>>> I think it's good as is -- I would not add anything more to
> >>>> TaskMetadata.
> >>>>>>>
> >>>>>>> About subtopologies and tasks. We do have the concept of subtopologies
> >>>>>>> already in KIP-120. It's only missing and ID that allow to link a
> >>>>>>> subtopology to a task.
> >>>>>>>
> >>>>>>> IMHO, adding a simple variable to `Subtopoloy` that provide the id
> >>>>>>> should be sufficient. We can simply document in the JavaDocs how
> >>>>>>> Subtopology and TaskMetadata can be linked to each other.
> >>>>>>>
> >>>>>>> I did update KIP-120 accordingly.
> >>>>>>>
> >>>>>>>
> >>>>>>> -Matthias
> >>>>>>>
> >>>>>>> On 3/28/17 3:45 PM, Florian Hussonnois wrote:
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I've updated the KIP and the PR to reflect your suggestions.
> >>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>> https://github.com/apache/kafka/pull/2612
> >>>>>>>> <https://github.com/apache/kafka/pull/2612>
> >>>>>>>>
> >>>>>>>> Also, I've exposed property StreamThread#state as a string through
> >>>>>>>> the
> >>>>>>>> new class ThreadMetadata.
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> 2017-03-27 23:40 GMT+02:00 Florian Hussonnois <fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com>
> >>>>>>>> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>>:
> >>>>>>>>
> >>>>>>>> Hi Guozhang, Matthias,
> >>>>>>>>
> >>>>>>>> It's a great idea to add sub topologies descriptions. This would
> >>>>>>>> help developers to better understand topology concept.
> >>>>>>>>
> >>>>>>>> I agree that is not really user-friendly to check if
> >>>>>>>> `StreamsMetadata#streamThreads` is not returning null.
> >>>>>>>>
> >>>>>>>> The method name localThreadsMetadata looks good. In addition, it's
> >>>>>>>> more simple to build ThreadMetadata instances from the
> >>>> `StreamTask`
> >>>>>>>> class than from `StreamPartitionAssignor` class.
> >>>>>>>>
> >>>>>>>> I will work on modifications. As I understand, I have to add the
> >>>>>>>> property subTopologyId property to the TaskMetadata class - Am I
> >>>>>> right ?
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> 2017-03-26 0:25 GMT+01:00 Guozhang Wang <wangg...@gmail.com
> >>>>>>>> <mailto:wangg...@gmail.com>
> >>>>>>>> <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>>:
> >>>>>>>>
> >>>>>>>> Re 1): this is a good point. May be we can move
> >>>>>>>> `StreamsMetadata#streamThreads` as
> >>>>>>>> `KafkaStreams#localThreadsMetadata`?
> >>>>>>>>
> >>>>>>>> 3): this is a minor suggestion about function name of
> >>>>>>>> `assignedPartitions`, to `topicPartitions` to be consistent
> >>>> with
> >>>>>>>> `StreamsMetadata`?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Guozhang
> >>>>>>>>
> >>>>>>>> On Thu, Mar 23, 2017 at 4:30 PM, Matthias J. Sax
> >>>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io>
> >>>>>>>> <mailto:matth...@confluent.io <mailto:matth...@confluent.io>>> wrote:
> >>>>>>>>
> >>>>>>>> Thanks for the progress on this KIP. I think we are on the
> >>>>>>>> right path!
> >>>>>>>>
> >>>>>>>> Couple of comments/questions:
> >>>>>>>>
> >>>>>>>> (1) Why do we not consider the "rejected alternative" to
> >>>> add
> >>>>>>>> the method
> >>>>>>>> to KafkaStreams? The comment on #streamThreads() says:
> >>>>>>>>
> >>>>>>>> "Note this method will return <code>null</code> if called
> >>>> on
> >>>>>>>> {@link
> >>>>>>>> StreamsMetadata} which represent a remote application."
> >>>>>>>>
> >>>>>>>> Thus, if we cannot get any remote metadata, it seems not
> >>>>>>>> straight
> >>>>>>>> forward to not add it to KafkaStreams directly -- this
> >>>> would
> >>>>>>>> avoid
> >>>>>>>> invalid calls and `null` return value in the first place.
> >>>>>>>>
> >>>>>>>> I like the idea about exposing sub-topologies.:
> >>>>>>>>
> >>>>>>>> (2a) I would recommend to rename `topicsGroupId` to
> >>>>>>>> `subTopologyId` :)
> >>>>>>>>
> >>>>>>>> (2b) We could add this to KIP-120 already. However, I
> >>>> would
> >>>>>>>> not just
> >>>>>>>> link both via name, but leverage KIP-120 directly, and
> >>>> add a
> >>>>>>>> "Subtopology" member to the TaskMetadata class.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Overall, I like the distinction of KIP-120 only exposing
> >>>>>>>> "static"
> >>>>>>>> information that can be determined before the topology
> >>>> get's
> >>>>>>>> started,
> >>>>>>>> while this KIP allow to access runtime information.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 3/22/17 12:42 PM, Guozhang Wang wrote:
> >>>>>>>>> Thanks for the updated KIP, and sorry for the late
> >>>>>> replies!
> >>>>>>>>>
> >>>>>>>>> I think a little bit more about KIP-130, and I feel that
> >>>>>>>> if we are going
> >>>>>>>>> to deprecate the `toString` function (it is not
> >>>> explicitly
> >>>>>>>> said in the
> >>>>>>>>> KIP, so I'm not sure if you plan to still keep the
> >>>>>>>>> `KafkaStreams#toString` as is or are going to replace it
> >>>>>>>> with the
> >>>>>>>>> proposed APIs) with the proposed ones, it may be okay.
> >>>>>> More
> >>>>>>>>> specifically, after both KIP-120 and KIP-130:
> >>>>>>>>>
> >>>>>>>>> 1. users can use `#describe` function to check the
> >>>>>>>> generated topology
> >>>>>>>>> before calling `KafkaStreams#start`, which is static
> >>>>>>>> information.
> >>>>>>>>> 2. users can use the `StreamsMetadata -> ThreadMetadata
> >>>> ->
> >>>>>>>> TaskMetadata`
> >>>>>>>>> programmatically after called `KafkaStreams#start` to
> >>>> get
> >>>>>> the
> >>>>>>>>> dynamically changeable information.
> >>>>>>>>>
> >>>>>>>>> One thing I'm still not sure though, is that in
> >>>>>>>> `TaskMetadata` we only
> >>>>>>>>> have the TaskId and assigned partitions, whereas in
> >>>>>>>>> "TopologyDescription" introduced in KIP-120, it will
> >>>>>>>> simply describe the
> >>>>>>>>> whole topology possibly composed of multiple
> >>>>>>>> sub-topologies. So it is
> >>>>>>>>> hard for users to tell which sub-topology is executed
> >>>>>>>> under which task
> >>>>>>>>> on-the-fly.
> >>>>>>>>>
> >>>>>>>>> Hence I'm thinking if we can expose the
> >>>> "sub-topology-id"
> >>>>>>>> (named as
> >>>>>>>>> topicsGroupId internally) in
> >>>>>>>> TopologyDescription#Subtopology, and then
> >>>>>>>>> from the task id which is essentially "sub-topology-id
> >>>>>> DASH
> >>>>>>>>> partition-group-id" users can make the link, though it
> >>>> is
> >>>>>>>> still not that
> >>>>>>>>> straight-forward.
> >>>>>>>>>
> >>>>>>>>> Thoughts?
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Mar 15, 2017 at 3:16 PM, Florian Hussonnois
> >>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>>> <mailto:fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Thanks Guozhang for pointing me to the KIP-120.
> >>>>>>>>>
> >>>>>>>>> I've made some modifications to the KIP. I also
> >>>>>> proposed a new PR
> >>>>>>>>> (there is
> >>>>>>>>> still some tests to make).
> >>>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>> <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>
> >>>>>>>>> <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>> <
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+public+API
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Exposing consumed offsets through JMX is sufficient
> >>>>>> for debugging
> >>>>>>>>> purpose.
> >>>>>>>>> But I think this could be part to another JIRA as
> >>>>>> there is no impact to
> >>>>>>>>> public API.
> >>>>>>>>>
> >>>>>>>>> Thanks
> >>>>>>>>>
> >>>>>>>>> 2017-03-10 22:35 GMT+01:00 Guozhang Wang <
> >>>>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
> >>>>>> <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>
> >>>>>>>>> <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com> <mailto:
> >>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
> >>>>>>>>> :
> >>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> Hello Florian,
> >>>>>>>>>>
> >>>>>>>>>> As for programmatically discover monitoring data
> >>>> by
> >>>>>>>> piping metrics
> >>>>>>>>> into a
> >>>>>>>>>> dedicated topic. I think you can actually use a
> >>>>>>>>> KafkaMetricsReporter which
> >>>>>>>>>> pipes the polled metric values into a pre-defined
> >>>>>>>> topic (note that
> >>>>>>>>> in Kafka
> >>>>>>>>>> the MetricsReporter is simply an interface and
> >>>> users
> >>>>>>>> can build
> >>>>>>>>> their own
> >>>>>>>>>> impl in addition to the JMXReporter), for example
> >>>> :
> >>>>>>>>>>
> >>>>>>>>>> https://github.com/krux/kafka-metrics-reporter
> >>>>>>>>>> <https://github.com/krux/kafka-metrics-reporter>
> >>>>>>>> <https://github.com/krux/kafka-metrics-reporter
> >>>>>>>> <https://github.com/krux/kafka-metrics-reporter>>
> >>>>>>>>> <https://github.com/krux/kafka-metrics-reporter
> >>>>>>>>> <https://github.com/krux/kafka-metrics-reporter>
> >>>>>>>> <https://github.com/krux/kafka-metrics-reporter
> >>>>>>>> <https://github.com/krux/kafka-metrics-reporter>>>
> >>>>>>>>>>
> >>>>>>>>>> As for the "static task-level assignment", what I
> >>>>>>>> meant is that
> >>>>>>>>> the mapping
> >>>>>>>>>> from source-topic-partitions -> tasks are static,
> >>>>>>>> via the
> >>>>>>>>>> "PartitionGrouper", and a task won't switch from
> >>>> an
> >>>>>>>> active task to a
> >>>>>>>>>> standby task, it is actually that an active task
> >>>>>>>> could be
> >>>>>>>>> migrated, as a
> >>>>>>>>>> whole along with all its assigned partitions, to
> >>>>>>>> another thread /
> >>>>>>>>> process
> >>>>>>>>>> and a new standby task will be created on the host
> >>>>>>>> that this
> >>>>>>>>> active task is
> >>>>>>>>>> migrating from. So for the SAME task, its
> >>>>>> taskMetadata.
> >>>>>>>>>> assignedPartitions()
> >>>>>>>>>> will always return you the same partitions.
> >>>>>>>>>>
> >>>>>>>>>> As for the `toString` function that what we have
> >>>>>>>> today, I feel it
> >>>>>>>>> has some
> >>>>>>>>>> correlations with KIP-120 so I'm trying to
> >>>>>>>> coordinate some
> >>>>>>>>> discussions here
> >>>>>>>>>> (cc'ing Matthias as the owner of KIP-120). My
> >>>>>>>> understand is that:
> >>>>>>>>>>
> >>>>>>>>>> 1. In KIP-120, the `toString` function of
> >>>>>>>> `KafkaStreams` will be
> >>>>>>>>> removed
> >>>>>>>>>> and instead the `Topology#describe` function will
> >>>> be
> >>>>>>>> introduced
> >>>>>>>>> for users
> >>>>>>>>>> to debug the topology BEFORE start running their
> >>>>>>>> instance with the
> >>>>>>>>>> topology. And hence the description won't contain
> >>>>>>>> any task
> >>>>>>>>> information as
> >>>>>>>>>> they are not formed yet.
> >>>>>>>>>> 2. In KIP-130, we want to add the task-level
> >>>>>>>> information for
> >>>>>>>>> monitoring
> >>>>>>>>>> purposes, which is not static and can only be
> >>>>>>>> captured AFTER the
> >>>>>>>>> instance
> >>>>>>>>>> has started running. Again I'm wondering for
> >>>> KIP-130
> >>>>>>>> alone if
> >>>>>>>>> adding those
> >>>>>>>>>> metrics mentioned in my previous email would
> >>>> suffice
> >>>>>>>> even for the
> >>>>>>>>> use case
> >>>>>>>>>> that you have mentioned.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Guozhang
> >>>>>>>>>>
> >>>>>>>>>> On Wed, Mar 8, 2017 at 3:18 PM, Florian Hussonnois
> >>>>>>>>> <fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> <mailto:
> >>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com> <mailto:
> >>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Hi Guozhang
> >>>>>>>>>>>
> >>>>>>>>>>> Thank you for your feedback. I've started to
> >>>> look
> >>>>>>>> more deeply
> >>>>>>>>> into the
> >>>>>>>>>>> code. As you mention, it would be more clever to
> >>>>>>>> use the current
> >>>>>>>>>>> StreamMetadata API to expose these information.
> >>>>>>>>>>>
> >>>>>>>>>>> I think exposing metrics through JMX is great
> >>>> for
> >>>>>>>> building
> >>>>>>>>> monitoring
> >>>>>>>>>>> dashboards using some tools like jmxtrans and
> >>>>>> grafana.
> >>>>>>>>>>> But for our use case we would like to expose the
> >>>>>>>> states
> >>>>>>>>> directely from
> >>>>>>>>>> the
> >>>>>>>>>>> application embedding the kstreams topologies.
> >>>>>>>>>>> So we expect to be able to retrieve states in a
> >>>>>>>> programmatic way.
> >>>>>>>>>>>
> >>>>>>>>>>> For instance, we could imagin to produce those
> >>>>>>>> states into a
> >>>>>>>>> dedicated
> >>>>>>>>>>> topic. In that way a third application could
> >>>>>>>> automatically
> >>>>>>>>> discover all
> >>>>>>>>>>> kafka-streams applications which could be
> >>>>>> monitored.
> >>>>>>>>>>> In production environment, that can be clearly a
> >>>>>>>> solution to have a
> >>>>>>>>>>> complete overview of a microservices
> >>>> architecture
> >>>>>>>> based on Kafka
> >>>>>>>>> Streams.
> >>>>>>>>>>>
> >>>>>>>>>>> The toString() method give a lots of information
> >>>>>>>> it can only be
> >>>>>>>>> used for
> >>>>>>>>>>> debugging purpose but not to build a topologies
> >>>>>>>> visualization
> >>>>>>>>> tool. We
> >>>>>>>>>>> could actually expose same details about the
> >>>>>>>> stream topology
> >>>>>>>>> from the
> >>>>>>>>>>> StreamMetadata API ? So the TaskMetadata class
> >>>> you
> >>>>>>>> have
> >>>>>>>>> suggested could
> >>>>>>>>>>> contains similar information that ones return by
> >>>>>>>> the toString
> >>>>>>>>> method from
> >>>>>>>>>>> AbstractTask class ?
> >>>>>>>>>>>
> >>>>>>>>>>> I can update the KIP in that way.
> >>>>>>>>>>>
> >>>>>>>>>>> Finally, I'm not sure to understand your last
> >>>>>>>> point :* "Note
> >>>>>>>>> that the
> >>>>>>>>>>> task-level assignment information is static,
> >>>> i.e.
> >>>>>>>> it will not change
> >>>>>>>>>> during
> >>>>>>>>>>> the runtime" *
> >>>>>>>>>>>
> >>>>>>>>>>> Does that mean when a rebalance occurs new tasks
> >>>>>>>> are created for
> >>>>>>>>> the new
> >>>>>>>>>>> assignments and old ones just switch to a
> >>>> standby
> >>>>>>>> state ?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>>
> >>>>>>>>>>> 2017-03-05 7:04 GMT+01:00 Guozhang Wang
> >>>>>>>> <wangg...@gmail.com <mailto:wangg...@gmail.com>
> >>>>>>>> <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com>>
> >>>>>>>>> <mailto:wangg...@gmail.com <mailto:wangg...@gmail.com> <mailto:
> >>>> wangg...@gmail.com <mailto:wangg...@gmail.com>
> >>>>>>>>> :
> >>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>> Hello Florian,
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks for the KIP and your detailed
> >>>> explanation
> >>>>>>>> of your use
> >>>>>>>>> case. I
> >>>>>>>>>>> think
> >>>>>>>>>>>> there are two dimensions to discuss on how to
> >>>>>>>> improve Streams'
> >>>>>>>>>>>> debuggability (or more specifically state
> >>>>>>>> exposure for
> >>>>>>>>> visualization).
> >>>>>>>>>>>>
> >>>>>>>>>>>> First question is "what information should we
> >>>>>>>> expose to the
> >>>>>>>>> user". From
> >>>>>>>>>>>> your KIP I saw generally three categories:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. The state of the thread within a process,
> >>>> as
> >>>>>>>> you mentioned
> >>>>>>>>> currently
> >>>>>>>>>>> we
> >>>>>>>>>>>> only expose the state of the process but not
> >>>> the
> >>>>>>>> finer grained
> >>>>>>>>>> per-thread
> >>>>>>>>>>>> state.
> >>>>>>>>>>>> 2. The state of the task. Currently the most
> >>>>>>>> close API to this is
> >>>>>>>>>>>> StreamsMetadata,
> >>>>>>>>>>>> however it aggregates the tasks across all
> >>>>>>>> threads and only
> >>>>>>>>> present the
> >>>>>>>>>>>> aggregated set of the assigned partitions /
> >>>>>>>> state stores etc.
> >>>>>>>>> We can
> >>>>>>>>>>>> consider extending this method to have a new
> >>>>>>>>> StreamsMetadata#tasks()
> >>>>>>>>>>> which
> >>>>>>>>>>>> returns a TaskMetadata with the similar
> >>>> fields,
> >>>>>>>> and the
> >>>>>>>>>>>> StreamsMetadata.stateStoreNames / etc would
> >>>>>>>> still be returning the
> >>>>>>>>>>>> aggregated results but users can still "drill
> >>>>>>>> down" if they want.
> >>>>>>>>>>>>
> >>>>>>>>>>>> The second question is "how should we expose
> >>>>>>>> them to the
> >>>>>>>>> user". For
> >>>>>>>>>>>> example, you mentioned about
> >>>>>>>> consumedOffsetsByPartition in the
> >>>>>>>>>>> activeTasks.
> >>>>>>>>>>>> We could add this as a JMX metric based on
> >>>> fetch
> >>>>>>>> positions
> >>>>>>>>> inside the
> >>>>>>>>>>>> consumer layer (note that Streams is just
> >>>>>>>> embedding consumers)
> >>>>>>>>> or we
> >>>>>>>>>>> could
> >>>>>>>>>>>> consider adding it into TaskMetadata. Either
> >>>>>>>> case it can be
> >>>>>>>>> visualized
> >>>>>>>>>>> for
> >>>>>>>>>>>> monitoring. The reason we expose
> >>>> StreamsMetadata
> >>>>>>>> as well as
> >>>>>>>>> State was
> >>>>>>>>>>> that
> >>>>>>>>>>>> it is expected to be "polled" in a
> >>>> programmatic
> >>>>>>>> way for
> >>>>>>>>> interactive
> >>>>>>>>>>> queries
> >>>>>>>>>>>> and also for control flows (e.g. I would like
> >>>> to
> >>>>>>>> ONLY start
> >>>>>>>>> running my
> >>>>>>>>>>>> other topology until the first topology has
> >>>> been
> >>>>>>>> up and
> >>>>>>>>> running) while
> >>>>>>>>>>> for
> >>>>>>>>>>>> your case it seems the main purpose is to
> >>>>>>>> continuously query
> >>>>>>>>> them for
> >>>>>>>>>>>> monitoring etc. Personally I'd prefer to
> >>>> expose
> >>>>>>>> them as JMX
> >>>>>>>>> only for
> >>>>>>>>>> such
> >>>>>>>>>>>> purposes only to have a simpler API.
> >>>>>>>>>>>>
> >>>>>>>>>>>> So given your current motivations I'd suggest
> >>>>>>>> expose the following
> >>>>>>>>>>>> information as newly added metrics in Streams:
> >>>>>>>>>>>>
> >>>>>>>>>>>> 1. Thread-level state metric.
> >>>>>>>>>>>> 2. Task-level hosted client identifier metric
> >>>>>>>> (e.g. host:port).
> >>>>>>>>>>>> 3. Consumer-level per-topic/partition position
> >>>>>>>> metric (
> >>>>>>>>>>>>
> >>>>>>>>
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>> <https://kafka.apache.org/documentation/#topic_fetch_monitoring>
> >>>>>>>> <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>> <https://kafka.apache.org/documentation/#topic_fetch_monitoring>>
> >>>>>>>>>
> >>>>>>>> <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>> <https://kafka.apache.org/documentation/#topic_fetch_monitoring>
> >>>>>>>> <
> >>>>>> https://kafka.apache.org/documentation/#topic_fetch_monitoring
> >>>>>> <https://kafka.apache.org/documentation/#topic_fetch_monitoring>>>).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Note that the task-level assignment
> >>>> information
> >>>>>> is static,
> >>>>>>>>> i.e. it will
> >>>>>>>>>>> not
> >>>>>>>>>>>> change during the runtime at all and can be
> >>>>>> accessed from the
> >>>>>>>>>>> `toString()`
> >>>>>>>>>>>> function already even before the instance
> >>>> start
> >>>>>> running, so I
> >>>>>>>>> think
> >>>>>>>>>> this
> >>>>>>>>>>>> piece of information do not need to be exposed
> >>>>>> through JMX
> >>>>>>>>> anymore.
> >>>>>>>>>>>>
> >>>>>>>>>>>> WDYT?
> >>>>>>>>>>>>
> >>>>>>>>>>>> Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Thu, Mar 2, 2017 at 3:11 AM, Damian Guy
> >>>>>>>>> <damian....@gmail.com <mailto:damian....@gmail.com>
> >>>>>>>>> <mailto:damian....@gmail.com <mailto:damian....@gmail.com>>
> >>>>>>>> <mailto:damian....@gmail.com
> >>>>>>>> <mailto:damian....@gmail.com> <mailto:damian....@gmail.com
> >>>>>>>> <mailto:damian....@gmail.com>
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Florian,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It seems there is some overlap here with
> >>>> what
> >>>>>> we already have in
> >>>>>>>>>>>>> KafkaStreams.allMetadata(). This currently
> >>>>>> returns a
> >>>>>>>>>>>>> Collection<StreamsMetadata> where each
> >>>>>> StreamsMetadata
> >>>>>>>>> instance holds
> >>>>>>>>>>> the
> >>>>>>>>>>>>> state stores and partition assignment for
> >>>>>> every instance of the
> >>>>>>>>>>>>> KafkaStreams application. I'm wondering if
> >>>>>> that is good
> >>>>>>>>> enough for
> >>>>>>>>>> what
> >>>>>>>>>>>> you
> >>>>>>>>>>>>> are trying to achieve? If not could it be
> >>>>>> modified to
> >>>>>>>>> include the per
> >>>>>>>>>>>>> Thread assignment?
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>> Damian
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, 1 Mar 2017 at 22:49 Florian
> >>>>>> Hussonnois <
> >>>>>>>>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com> <mailto:
> >>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com> <mailto:
> >>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Matthias,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> First, I will answer to your last
> >>>> question.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The main reason to have both
> >>>>>>>> TaskState#assignment and
> >>>>>>>>>>>>>> TaskState#consumedOffsetsByPartition is
> >>>> that
> >>>>>>>> tasks have no
> >>>>>>>>>> consumed
> >>>>>>>>>>>>> offsets
> >>>>>>>>>>>>>> until at least one message is consumed for
> >>>>>>>> each partition
> >>>>>>>>> even if
> >>>>>>>>>>>>> previous
> >>>>>>>>>>>>>> offsets exist for the consumer group.
> >>>>>>>>>>>>>> So yes this methods are redundant as it
> >>>> only
> >>>>>>>> diverge at
> >>>>>>>>> application
> >>>>>>>>>>>>>> startup.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> About the use case, currently we are
> >>>>>>>> developping for a
> >>>>>>>>> customer a
> >>>>>>>>>>>> little
> >>>>>>>>>>>>>> framework based on KafkaStreams which
> >>>>>>>>> transform/denormalize data
> >>>>>>>>>>> before
> >>>>>>>>>>>>>> ingesting into hadoop.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> We have a cluster of workers (SpringBoot)
> >>>>>>>> which instantiate
> >>>>>>>>>> KStreams
> >>>>>>>>>>>>>> topologies dynamicaly based on dataflow
> >>>>>>>> configurations.
> >>>>>>>>>>>>>> Each configuration describes a topic to
> >>>>>>>> consume and how to
> >>>>>>>>> process
> >>>>>>>>>>>>> messages
> >>>>>>>>>>>>>> (this looks like NiFi processors API).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Our architecture is inspired from
> >>>>>>>> KafkaConnect. We have
> >>>>>>>>> topics for
> >>>>>>>>>>>>> configs
> >>>>>>>>>>>>>> and states which are consumed by each
> >>>>>>>> workers (actually we
> >>>>>>>>> have
> >>>>>>>>>>> reused
> >>>>>>>>>>>>> some
> >>>>>>>>>>>>>> internals classes to the connect API).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Now, we would like to develop UIs to
> >>>>>>>> visualize topics and
> >>>>>>>>>> partitions
> >>>>>>>>>>>>>> consumed by our worker applications.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also, I think it would be nice to be able,
> >>>>>>>> in the futur, to
> >>>>>>>>>> develop
> >>>>>>>>>>>> web
> >>>>>>>>>>>>>> UIs similar to Spark but for KafkaStreams
> >>>> to
> >>>>>>>> visualize
> >>>>>>>>> DAGs...so
> >>>>>>>>>>> maybe
> >>>>>>>>>>>>> this
> >>>>>>>>>>>>>> KIP is just a first step.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 2017-03-01 22:52 GMT+01:00 Matthias J. Sax
> >>>>>>>>> <matth...@confluent.io <mailto:matth...@confluent.io> <mailto:
> >>>> matth...@confluent.io <mailto:matth...@confluent.io>>
> >>>>>>>> <mailto:matth...@confluent.io
> >>>>>>>> <mailto:matth...@confluent.io> <mailto:
> >>>> matth...@confluent.io <mailto:matth...@confluent.io>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>>>> :
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Thanks for the KIP.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I am wondering a little bit, why you
> >>>> need
> >>>>>>>> to expose this
> >>>>>>>>>>> information.
> >>>>>>>>>>>>>>> Can you describe some use cases?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Would it be worth to unify this new API
> >>>>>> with
> >>>>>>>>> KafkaStreams#state()
> >>>>>>>>>>> to
> >>>>>>>>>>>>> get
> >>>>>>>>>>>>>>> the overall state of an application
> >>>>>>>> without the need to
> >>>>>>>>> call two
> >>>>>>>>>>>>>>> different methods? Not sure how this
> >>>>>>>> unified API might
> >>>>>>>>> look like
> >>>>>>>>>>>>> though.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> One minor comment about the API:
> >>>>>>>> TaskState#assignment
> >>>>>>>>> seems to be
> >>>>>>>>>>>>>>> redundant. It should be the same as
> >>>>>>>>>>>>>>>
> >>>>>> TaskState#consumedOffsetsByPartition.keySet()
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Or do I miss something?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 3/1/17 5:19 AM, Florian Hussonnois
> >>>>>> wrote:
> >>>>>>>>>>>>>>>> Hi Eno,
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Yes, but the state() method only
> >>>> returns
> >>>>>>>> the global
> >>>>>>>>> state of
> >>>>>>>>>> the
> >>>>>>>>>>>>>>>> KafkaStream application (ie: CREATED,
> >>>>>>>> RUNNING,
> >>>>>>>>> REBALANCING,
> >>>>>>>>>>>>>>>> PENDING_SHUTDOWN, NOT_RUNNING).
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> An alternative to this KIP would be to
> >>>>>>>> change this
> >>>>>>>>> method to
> >>>>>>>>>>> return
> >>>>>>>>>>>>>> more
> >>>>>>>>>>>>>>>> information instead of adding a new
> >>>>>> method.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> 2017-03-01 13:46 GMT+01:00 Eno
> >>>> Thereska
> >>>>>> <
> >>>>>>>>>> eno.there...@gmail.com <mailto:eno.there...@gmail.com>
> >>>>>>>> <mailto:eno.there...@gmail.com
> >>>>>>>> <mailto:eno.there...@gmail.com>>
> >>>>>>>> <mailto:eno.there...@gmail.com
> >>>>>>>> <mailto:eno.there...@gmail.com> <mailto:
> >>>>>> eno.there...@gmail.com <mailto:eno.there...@gmail.com>>>
> >>>>>>>>>>>> :
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks Florian,
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Have you had a chance to look at the
> >>>>>> new state methods in
> >>>>>>>>>>> 0.10.2,
> >>>>>>>>>>>>>> e.g.,
> >>>>>>>>>>>>>>>>> KafkaStreams.state()?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Thanks
> >>>>>>>>>>>>>>>>> Eno
> >>>>>>>>>>>>>>>>>> On 1 Mar 2017, at 11:54, Florian
> >>>>>> Hussonnois <
> >>>>>>>>>>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com>> <mailto:
> >>>>>> fhussonn...@gmail.com <mailto:fhussonn...@gmail.com>
> >>>>>>>> <mailto:fhussonn...@gmail.com
> >>>>>>>> <mailto:fhussonn...@gmail.com>>>
> >>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Hi all,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I have just created KIP-130 to add a
> >>>>>>>> new method to the
> >>>>>>>>>>>> KafkaStreams
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>>>>> in
> >>>>>>>>>>>>>>>>>> order to expose the states of
> >>>> threads
> >>>>>>>> and active tasks.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>
> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>>
> >>>>>>>>>
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+
> >>>>>>>> <https://cwiki.apache.org/confluence/display/KAFKA/KIP+>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>> 130%3A+Expose+states+of+active+tasks+to+KafkaStreams+
> >>>>>>>>>> public+API
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Thanks,
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> --
> >>>>>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> --
> >>>>>>>>>>>> -- Guozhang
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> --
> >>>>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>> -- Guozhang
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Florian HUSSONNOIS
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> -- Guozhang
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Florian HUSSONNOIS
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> --
> >>>>>>>> Florian HUSSONNOIS
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>
>
>
> --
> -- Guozhang