Hi Jorge, Thanks for the clarifications.
Yes, I'm also not sure what "built-in tracing" would look like, and it may not be a good idea. FWIW, though, I was not thinking of something like "rich functions". Rather, I was imagining that Streams could just always record spans in headers as it processes the data, no need for the interceptor. However, it would be pretty complicated if you only wanted to record spans for certain records, or a certain percentage of records. This is where a tracing interceptor would look more attractive. On the actual interceptor, offhand, I suppose two main options are available: 1. define interceptor interfaces with hooks into the lifecycle, like you gave. (before/after process might work, but in general, it might be more like beforeProcess, beforeForward) 2. just let the interceptor implement the same Processor interface, but additionally have access to the "delegate" processor somehow? What approach did you take in your TracingProcessor? Thanks, -John On Tue, Sep 18, 2018 at 10:02 AM Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks > Guozhang and John. > > @Guozhang: > > > I'd suggest to provide a > > WrapperProcessorSupplier for the users than modifying > > InternalStreamsTopology: more specifically, you can provide an > > `abstract WrapperProcessorSupplier > > implements ProcessorSupplier` and then let users to instantiate this > class > > instead of the "bare-metal" interface. WDYT? > > Yes, in the gist, I have a class implementing `ProcessorSupplier`: > > ``` > public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K, > V> { > final KafkaTracing kafkaTracing; > final String name; > final ProcessorSupplier<K, V> delegate; > public TracingProcessorSupplier(KafkaTracing kafkaTracing, > String name, ProcessorSupplier<K, V> delegate) { > this.kafkaTracing = kafkaTracing; > this.name = name; > this.delegate = delegate; > } > @Override public Processor<K, V> get() { > return new TracingProcessor<>(kafkaTracing, name, delegate.get()); > } > } > ``` > > My challenge is how to wrap Topology Processors created by > `StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka > Streams users. > > @John: > > > The diff you posted only contains the library-side changes, and it's not > > obvious how you would use this to insert the desired tracing code. > > Perhaps you could provide a snippet demonstrating how you want to use > this > > change to enable tracing? > > My first approach was something like this: > > ``` > final StreamsBuilder builder = kafkaStreamsTracing.builder(); > ``` > > Where `KafkaStreamsTracing#builder` looks like this: > > ``` > public StreamsBuilder builder() { > return new StreamsBuilder(new Topology(new > TracingInternalTopologyBuilder(kafkaTracing))); > } > ``` > > Then, once the builder creates a topology, `processors` will be wrapped by > `TracingProcessorSupplier` described above. > > Probably this approach is too naive but works as an initial proof of > concept. > > > Off the top of my head, here are some other approaches you might > evaluate: > > * you mentioned interceptors. Perhaps we could create a > > ProcessorInterceptor interface and add a config to set it. > > This sounds very interesting to me. Then we won't need to touch internal > API's, and just provide some configs. One challenge here is how to define > the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend` > and then `onCommit`/`onAck` methods. For Stream processors, how this will > look like? Maybe `beforeProcess(context, key, value)` and > `afterProcess(context, key, value)`. > > > * perhaps we could simply build the tracing headers into Streams. Is > there > > a benefit to making it customizable? > > I don't understand this option completely. Do you mean something like > KIP-159 ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-159%3A+Introducing+Rich+functions+to+Streams > )? > Headers available on StreamsDSL will allow users to create "custom" traces, > for instance: > > ``` > stream.map( (headers, k, v) -> { > Span span = kafkaTracing.nextSpan(headers).start(); > doSomething(k, v); > span.finish(); > } > ``` > > but it won't be possible to instrument the existing processors exposed by > DSL only by enabling headers on Streams DSL. > > If we can define a way to pass a `ProcessorSupplier` to be used by > `StreamsBuilder#internalTopology` -not sure if via constructor or some > other way- would be enough to support this use-case. > > > Also, as Matthias said, you would need to create a KIP to propose this > > change, but of course we can continue this preliminary discussion until > you > > feel confident to create the KIP. > > Happy to do it once the approach is clearer. > > Cheers, > Jorge. > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>) > escribió: > > > If I understand the request, it's about tracking the latencies for a > > specific record, not the aggregated latencies for each processor. > > > > Jorge, > > > > The diff you posted only contains the library-side changes, and it's not > > obvious how you would use this to insert the desired tracing code. > > Perhaps you could provide a snippet demonstrating how you want to use > this > > change to enable tracing? > > > > Also, as Matthias said, you would need to create a KIP to propose this > > change, but of course we can continue this preliminary discussion until > you > > feel confident to create the KIP. > > > > Off the top of my head, here are some other approaches you might > evaluate: > > * you mentioned interceptors. Perhaps we could create a > > ProcessorInterceptor interface and add a config to set it. > > * perhaps we could simply build the tracing headers into Streams. Is > there > > a benefit to making it customizable? > > > > Thanks for considering this problem! > > -John > > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Jorge, > > > > > > From the TracingProcessor implementation it seems you want to track > > > per-processor processing latency, is that right? If this is the case > you > > > can actually use the per-processor metrics which include latency > sensors. > > > > > > If you do want to track, for a certain record, what's the latency of > > > processing it, then you'd probably need the processor implementation in > > > your repo. In this case, though, I'd suggest to provide a > > > WrapperProcessorSupplier for the users than modifying > > > InternalStreamsTopology: more specifically, you can provide an > > > `abstract WrapperProcessorSupplier > > > implements ProcessorSupplier` and then let users to instantiate this > > class > > > instead of the "bare-metal" interface. WDYT? > > > > > > > > > Guozhang > > > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < > > > quilcate.jo...@gmail.com> wrote: > > > > > > > Thanks for your answer, Matthias! > > > > > > > > What I'm looking for is something similar to interceptors, but for > > Stream > > > > Processors. > > > > > > > > In Zipkin -and probably other tracing implementations as well- we are > > > using > > > > Headers to propagate the context of a trace (i.e. adding metadata to > > the > > > > Kafka Record, so we can create references to a trace). > > > > Now that Headers are part of Kafka Streams Processor API, we can > > > propagate > > > > context from input (Consumers) to outputs (Producers) by using > > > > `KafkaClientSupplier` (e.g. < > > > > https://github.com/openzipkin/brave/blob/master/ > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > > > > TracingKafkaClientSupplier.java > > > > >). > > > > > > > > "Input to Output" traces could be enough for some use-cases, but we > are > > > > looking for a more detailed trace -that could cover cases like > > > side-effects > > > > (e.g. for each processor), where input/output and processors > latencies > > > can > > > > be recorded. This is why I have been looking for how to decorate the > > > > `ProcessorSupplier` and all the changes shown in the comparison. Here > > is > > > a > > > > gist of how we are planning to decorate the `addProcessor` method: > > > > https://github.com/openzipkin/brave/compare/master...jeqo: > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > > > > > > > > Hope this makes a bit more sense now :) > > > > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > > > > matth...@confluent.io>) > > > > escribió: > > > > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine > removing > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an > > > > > internal class. > > > > > > > > > > However, the diff also shows > > > > > > > > > > > public Topology(final InternalTopologyBuilder > > > internalTopologyBuilder) > > > > { > > > > > > > > > > This has two impacts: first, it modifies `Topology` what is part of > > > > > public API and would require a KIP. Second, it exposes > > > > > `InternalTopologyBuilder` as part of the public API -- something we > > > > > should not do. > > > > > > > > > > I am also not sure, why you want to do this (btw: also public API > > > change > > > > > requiring a KIP). However, this should not be necessary. > > > > > > > > > > > public StreamsBuilder(final Topology topology) { > > > > > > > > > > > > > > > I think I am lacking some context what you try to achieve. Maybe > you > > > can > > > > > elaborate in the problem you try to solve? > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > > > > > > Hi everyone, > > > > > > > > > > > > I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > One option is to override and access > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method it > is > > > > > final, > > > > > > and builder is not exposed as part of `StreamsBuilder`: > > > > > > > > > > > > ``` > > > > > > public class StreamsBuilder { > > > > > > > > > > > > /** The actual topology that is constructed by this > > > StreamsBuilder. > > > > > */ > > > > > > private final Topology topology = new Topology(); > > > > > > > > > > > > /** The topology's internal builder. */ > > > > > > final InternalTopologyBuilder internalTopologyBuilder = > > > > > > topology.internalTopologyBuilder; > > > > > > > > > > > > private final InternalStreamsBuilder internalStreamsBuilder = > > new > > > > > > InternalStreamsBuilder(internalTopologyBuilder); > > > > > > ``` > > > > > > > > > > > > The goal is that If `builder#addProcessor` is exposed, we could > > > > decorate > > > > > > every `ProcessorSupplier` and capture traces from it: > > > > > > > > > > > > ``` > > > > > > @Override > > > > > > public void addProcessor(String name, ProcessorSupplier > supplier, > > > > > > String... predecessorNames) { > > > > > > super.addProcessor(name, new TracingProcessorSupplier(tracer, > > > > name, > > > > > > supplier), predecessorNames); > > > > > > } > > > > > > ``` > > > > > > > > > > > > Would it make sense to propose this as a change: > > > > > > > > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > > > > ? > > > > > or > > > > > > maybe there is a better way to do this? > > > > > > TopologyWrapper does something similar: > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java > > > > > > > > > > > > Thanks in advance for any help. > > > > > > > > > > > > Cheers, > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > >