Thanks, everyone! @Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek` processor runs on a potentially different thread, then passing the trace between them could be challenging. It will also require users to add these operators themselves, which could be too cumbersome to use.
@Guozhang and @John: I will first focus on creating the `TracingProcessorSupplier` for instrumenting custom `Processors` and I will keep the idea of a `ProcessorInterceptor` in the back of my head to see if it make sense to propose a KIP for this. Thanks again for your feedback! Cheers, Jorge. El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>) escribió: > Jorge: > > I have a crazy idea off the top of my head. > > Would something as low-tech using KSteam.peek calls on either side of > certain processors to record start and end times work? > > Thanks, > Bill > > On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Jorge: > > > > My suggestion was to let your users to implement on the > > TracingProcessorSupplier > > / TracingProcessor directly instead of the base-line ProcessorSupplier / > > Processor. Would that work for you? > > > > > > Guozhang > > > > > > On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < > > quilcate.jo...@gmail.com> wrote: > > > > > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks > > > Guozhang and John. > > > > > > @Guozhang: > > > > > > > I'd suggest to provide a > > > > WrapperProcessorSupplier for the users than modifying > > > > InternalStreamsTopology: more specifically, you can provide an > > > > `abstract WrapperProcessorSupplier > > > > implements ProcessorSupplier` and then let users to instantiate this > > > class > > > > instead of the "bare-metal" interface. WDYT? > > > > > > Yes, in the gist, I have a class implementing `ProcessorSupplier`: > > > > > > ``` > > > public class TracingProcessorSupplier<K, V> implements > > ProcessorSupplier<K, > > > V> { > > > final KafkaTracing kafkaTracing; > > > final String name; > > > final ProcessorSupplier<K, V> delegate; > > > public TracingProcessorSupplier(KafkaTracing kafkaTracing, > > > String name, ProcessorSupplier<K, V> delegate) { > > > this.kafkaTracing = kafkaTracing; > > > this.name = name; > > > this.delegate = delegate; > > > } > > > @Override public Processor<K, V> get() { > > > return new TracingProcessor<>(kafkaTracing, name, delegate.get()); > > > } > > > } > > > ``` > > > > > > My challenge is how to wrap Topology Processors created by > > > `StreamsBuilder#build` to make this instrumentation easy to adopt by > > Kafka > > > Streams users. > > > > > > @John: > > > > > > > The diff you posted only contains the library-side changes, and it's > > not > > > > obvious how you would use this to insert the desired tracing code. > > > > Perhaps you could provide a snippet demonstrating how you want to use > > > this > > > > change to enable tracing? > > > > > > My first approach was something like this: > > > > > > ``` > > > final StreamsBuilder builder = kafkaStreamsTracing.builder(); > > > ``` > > > > > > Where `KafkaStreamsTracing#builder` looks like this: > > > > > > ``` > > > public StreamsBuilder builder() { > > > return new StreamsBuilder(new Topology(new > > > TracingInternalTopologyBuilder(kafkaTracing))); > > > } > > > ``` > > > > > > Then, once the builder creates a topology, `processors` will be wrapped > > by > > > `TracingProcessorSupplier` described above. > > > > > > Probably this approach is too naive but works as an initial proof of > > > concept. > > > > > > > Off the top of my head, here are some other approaches you might > > > evaluate: > > > > * you mentioned interceptors. Perhaps we could create a > > > > ProcessorInterceptor interface and add a config to set it. > > > > > > This sounds very interesting to me. Then we won't need to touch > internal > > > API's, and just provide some configs. One challenge here is how to > define > > > the hooks. In consumer/producer, lifecycle is clear, > > `onConsumer`/`onSend` > > > and then `onCommit`/`onAck` methods. For Stream processors, how this > will > > > look like? Maybe `beforeProcess(context, key, value)` and > > > `afterProcess(context, key, value)`. > > > > > > > * perhaps we could simply build the tracing headers into Streams. Is > > > there > > > > a benefit to making it customizable? > > > > > > I don't understand this option completely. Do you mean something like > > > KIP-159 ( > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > 159%3A+Introducing+Rich+functions+to+Streams > > > )? > > > Headers available on StreamsDSL will allow users to create "custom" > > traces, > > > for instance: > > > > > > ``` > > > stream.map( (headers, k, v) -> { > > > Span span = kafkaTracing.nextSpan(headers).start(); > > > doSomething(k, v); > > > span.finish(); > > > } > > > ``` > > > > > > but it won't be possible to instrument the existing processors exposed > by > > > DSL only by enabling headers on Streams DSL. > > > > > > If we can define a way to pass a `ProcessorSupplier` to be used by > > > `StreamsBuilder#internalTopology` -not sure if via constructor or some > > > other way- would be enough to support this use-case. > > > > > > > Also, as Matthias said, you would need to create a KIP to propose > this > > > > change, but of course we can continue this preliminary discussion > until > > > you > > > > feel confident to create the KIP. > > > > > > Happy to do it once the approach is clearer. > > > > > > Cheers, > > > Jorge. > > > > > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>) > > > escribió: > > > > > > > If I understand the request, it's about tracking the latencies for a > > > > specific record, not the aggregated latencies for each processor. > > > > > > > > Jorge, > > > > > > > > The diff you posted only contains the library-side changes, and it's > > not > > > > obvious how you would use this to insert the desired tracing code. > > > > Perhaps you could provide a snippet demonstrating how you want to use > > > this > > > > change to enable tracing? > > > > > > > > Also, as Matthias said, you would need to create a KIP to propose > this > > > > change, but of course we can continue this preliminary discussion > until > > > you > > > > feel confident to create the KIP. > > > > > > > > Off the top of my head, here are some other approaches you might > > > evaluate: > > > > * you mentioned interceptors. Perhaps we could create a > > > > ProcessorInterceptor interface and add a config to set it. > > > > * perhaps we could simply build the tracing headers into Streams. Is > > > there > > > > a benefit to making it customizable? > > > > > > > > Thanks for considering this problem! > > > > -John > > > > > > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Jorge, > > > > > > > > > > From the TracingProcessor implementation it seems you want to track > > > > > per-processor processing latency, is that right? If this is the > case > > > you > > > > > can actually use the per-processor metrics which include latency > > > sensors. > > > > > > > > > > If you do want to track, for a certain record, what's the latency > of > > > > > processing it, then you'd probably need the processor > implementation > > in > > > > > your repo. In this case, though, I'd suggest to provide a > > > > > WrapperProcessorSupplier for the users than modifying > > > > > InternalStreamsTopology: more specifically, you can provide an > > > > > `abstract WrapperProcessorSupplier > > > > > implements ProcessorSupplier` and then let users to instantiate > this > > > > class > > > > > instead of the "bare-metal" interface. WDYT? > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < > > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > > > Thanks for your answer, Matthias! > > > > > > > > > > > > What I'm looking for is something similar to interceptors, but > for > > > > Stream > > > > > > Processors. > > > > > > > > > > > > In Zipkin -and probably other tracing implementations as well- we > > are > > > > > using > > > > > > Headers to propagate the context of a trace (i.e. adding metadata > > to > > > > the > > > > > > Kafka Record, so we can create references to a trace). > > > > > > Now that Headers are part of Kafka Streams Processor API, we can > > > > > propagate > > > > > > context from input (Consumers) to outputs (Producers) by using > > > > > > `KafkaClientSupplier` (e.g. < > > > > > > https://github.com/openzipkin/brave/blob/master/ > > > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > > > > > > TracingKafkaClientSupplier.java > > > > > > >). > > > > > > > > > > > > "Input to Output" traces could be enough for some use-cases, but > we > > > are > > > > > > looking for a more detailed trace -that could cover cases like > > > > > side-effects > > > > > > (e.g. for each processor), where input/output and processors > > > latencies > > > > > can > > > > > > be recorded. This is why I have been looking for how to decorate > > the > > > > > > `ProcessorSupplier` and all the changes shown in the comparison. > > Here > > > > is > > > > > a > > > > > > gist of how we are planning to decorate the `addProcessor` > method: > > > > > > https://github.com/openzipkin/brave/compare/master...jeqo: > > > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > > > > > > > > > > > > Hope this makes a bit more sense now :) > > > > > > > > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > > > > > > matth...@confluent.io>) > > > > > > escribió: > > > > > > > > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine > > > removing > > > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's > an > > > > > > > internal class. > > > > > > > > > > > > > > However, the diff also shows > > > > > > > > > > > > > > > public Topology(final InternalTopologyBuilder > > > > > internalTopologyBuilder) > > > > > > { > > > > > > > > > > > > > > This has two impacts: first, it modifies `Topology` what is > part > > of > > > > > > > public API and would require a KIP. Second, it exposes > > > > > > > `InternalTopologyBuilder` as part of the public API -- > something > > we > > > > > > > should not do. > > > > > > > > > > > > > > I am also not sure, why you want to do this (btw: also public > API > > > > > change > > > > > > > requiring a KIP). However, this should not be necessary. > > > > > > > > > > > > > > > public StreamsBuilder(final Topology topology) { > > > > > > > > > > > > > > > > > > > > > I think I am lacking some context what you try to achieve. > Maybe > > > you > > > > > can > > > > > > > elaborate in the problem you try to solve? > > > > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > > > > > > > > Hi everyone, > > > > > > > > > > > > > > > > I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > > > > > One option is to override and access > > > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method > > > it is > > > > > > > final, > > > > > > > > and builder is not exposed as part of `StreamsBuilder`: > > > > > > > > > > > > > > > > ``` > > > > > > > > public class StreamsBuilder { > > > > > > > > > > > > > > > > /** The actual topology that is constructed by this > > > > > StreamsBuilder. > > > > > > > */ > > > > > > > > private final Topology topology = new Topology(); > > > > > > > > > > > > > > > > /** The topology's internal builder. */ > > > > > > > > final InternalTopologyBuilder internalTopologyBuilder = > > > > > > > > topology.internalTopologyBuilder; > > > > > > > > > > > > > > > > private final InternalStreamsBuilder > > internalStreamsBuilder = > > > > new > > > > > > > > InternalStreamsBuilder(internalTopologyBuilder); > > > > > > > > ``` > > > > > > > > > > > > > > > > The goal is that If `builder#addProcessor` is exposed, we > could > > > > > > decorate > > > > > > > > every `ProcessorSupplier` and capture traces from it: > > > > > > > > > > > > > > > > ``` > > > > > > > > @Override > > > > > > > > public void addProcessor(String name, ProcessorSupplier > > > supplier, > > > > > > > > String... predecessorNames) { > > > > > > > > super.addProcessor(name, new TracingProcessorSupplier( > > > tracer, > > > > > > name, > > > > > > > > supplier), predecessorNames); > > > > > > > > } > > > > > > > > ``` > > > > > > > > > > > > > > > > Would it make sense to propose this as a change: > > > > > > > > > > > > > > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > > > > > > ? > > > > > > > or > > > > > > > > maybe there is a better way to do this? > > > > > > > > TopologyWrapper does something similar: > > > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java > > > > > > > > > > > > > > > > Thanks in advance for any help. > > > > > > > > > > > > > > > > Cheers, > > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > -- > > -- Guozhang > > >