Hello Jorge, >From the TracingProcessor implementation it seems you want to track per-processor processing latency, is that right? If this is the case you can actually use the per-processor metrics which include latency sensors.
If you do want to track, for a certain record, what's the latency of processing it, then you'd probably need the processor implementation in your repo. In this case, though, I'd suggest to provide a WrapperProcessorSupplier for the users than modifying InternalStreamsTopology: more specifically, you can provide an `abstract WrapperProcessorSupplier implements ProcessorSupplier` and then let users to instantiate this class instead of the "bare-metal" interface. WDYT? Guozhang On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > Thanks for your answer, Matthias! > > What I'm looking for is something similar to interceptors, but for Stream > Processors. > > In Zipkin -and probably other tracing implementations as well- we are using > Headers to propagate the context of a trace (i.e. adding metadata to the > Kafka Record, so we can create references to a trace). > Now that Headers are part of Kafka Streams Processor API, we can propagate > context from input (Consumers) to outputs (Producers) by using > `KafkaClientSupplier` (e.g. < > https://github.com/openzipkin/brave/blob/master/ > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > TracingKafkaClientSupplier.java > >). > > "Input to Output" traces could be enough for some use-cases, but we are > looking for a more detailed trace -that could cover cases like side-effects > (e.g. for each processor), where input/output and processors latencies can > be recorded. This is why I have been looking for how to decorate the > `ProcessorSupplier` and all the changes shown in the comparison. Here is a > gist of how we are planning to decorate the `addProcessor` method: > https://github.com/openzipkin/brave/compare/master...jeqo: > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > > Hope this makes a bit more sense now :) > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > matth...@confluent.io>) > escribió: > > > >> I'm experimenting on how to add tracing to Kafka Streams. > > > > What do you mean by this exactly? Is there a JIRA? I am fine removing > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an > > internal class. > > > > However, the diff also shows > > > > > public Topology(final InternalTopologyBuilder internalTopologyBuilder) > { > > > > This has two impacts: first, it modifies `Topology` what is part of > > public API and would require a KIP. Second, it exposes > > `InternalTopologyBuilder` as part of the public API -- something we > > should not do. > > > > I am also not sure, why you want to do this (btw: also public API change > > requiring a KIP). However, this should not be necessary. > > > > > public StreamsBuilder(final Topology topology) { > > > > > > I think I am lacking some context what you try to achieve. Maybe you can > > elaborate in the problem you try to solve? > > > > > > -Matthias > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > > > Hi everyone, > > > > > > I'm experimenting on how to add tracing to Kafka Streams. > > > > > > One option is to override and access > > > `InternalTopologyBuilder#addProcessor`. Currently this method it is > > final, > > > and builder is not exposed as part of `StreamsBuilder`: > > > > > > ``` > > > public class StreamsBuilder { > > > > > > /** The actual topology that is constructed by this StreamsBuilder. > > */ > > > private final Topology topology = new Topology(); > > > > > > /** The topology's internal builder. */ > > > final InternalTopologyBuilder internalTopologyBuilder = > > > topology.internalTopologyBuilder; > > > > > > private final InternalStreamsBuilder internalStreamsBuilder = new > > > InternalStreamsBuilder(internalTopologyBuilder); > > > ``` > > > > > > The goal is that If `builder#addProcessor` is exposed, we could > decorate > > > every `ProcessorSupplier` and capture traces from it: > > > > > > ``` > > > @Override > > > public void addProcessor(String name, ProcessorSupplier supplier, > > > String... predecessorNames) { > > > super.addProcessor(name, new TracingProcessorSupplier(tracer, > name, > > > supplier), predecessorNames); > > > } > > > ``` > > > > > > Would it make sense to propose this as a change: > > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > ? > > or > > > maybe there is a better way to do this? > > > TopologyWrapper does something similar: > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > test/java/org/apache/kafka/streams/TopologyWrapper.java > > > > > > Thanks in advance for any help. > > > > > > Cheers, > > > Jorge. > > > > > > > > -- -- Guozhang