Jorge: My suggestion was to let your users to implement on the TracingProcessorSupplier / TracingProcessor directly instead of the base-line ProcessorSupplier / Processor. Would that work for you?
Guozhang On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < quilcate.jo...@gmail.com> wrote: > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks > Guozhang and John. > > @Guozhang: > > > I'd suggest to provide a > > WrapperProcessorSupplier for the users than modifying > > InternalStreamsTopology: more specifically, you can provide an > > `abstract WrapperProcessorSupplier > > implements ProcessorSupplier` and then let users to instantiate this > class > > instead of the "bare-metal" interface. WDYT? > > Yes, in the gist, I have a class implementing `ProcessorSupplier`: > > ``` > public class TracingProcessorSupplier<K, V> implements ProcessorSupplier<K, > V> { > final KafkaTracing kafkaTracing; > final String name; > final ProcessorSupplier<K, V> delegate; > public TracingProcessorSupplier(KafkaTracing kafkaTracing, > String name, ProcessorSupplier<K, V> delegate) { > this.kafkaTracing = kafkaTracing; > this.name = name; > this.delegate = delegate; > } > @Override public Processor<K, V> get() { > return new TracingProcessor<>(kafkaTracing, name, delegate.get()); > } > } > ``` > > My challenge is how to wrap Topology Processors created by > `StreamsBuilder#build` to make this instrumentation easy to adopt by Kafka > Streams users. > > @John: > > > The diff you posted only contains the library-side changes, and it's not > > obvious how you would use this to insert the desired tracing code. > > Perhaps you could provide a snippet demonstrating how you want to use > this > > change to enable tracing? > > My first approach was something like this: > > ``` > final StreamsBuilder builder = kafkaStreamsTracing.builder(); > ``` > > Where `KafkaStreamsTracing#builder` looks like this: > > ``` > public StreamsBuilder builder() { > return new StreamsBuilder(new Topology(new > TracingInternalTopologyBuilder(kafkaTracing))); > } > ``` > > Then, once the builder creates a topology, `processors` will be wrapped by > `TracingProcessorSupplier` described above. > > Probably this approach is too naive but works as an initial proof of > concept. > > > Off the top of my head, here are some other approaches you might > evaluate: > > * you mentioned interceptors. Perhaps we could create a > > ProcessorInterceptor interface and add a config to set it. > > This sounds very interesting to me. Then we won't need to touch internal > API's, and just provide some configs. One challenge here is how to define > the hooks. In consumer/producer, lifecycle is clear, `onConsumer`/`onSend` > and then `onCommit`/`onAck` methods. For Stream processors, how this will > look like? Maybe `beforeProcess(context, key, value)` and > `afterProcess(context, key, value)`. > > > * perhaps we could simply build the tracing headers into Streams. Is > there > > a benefit to making it customizable? > > I don't understand this option completely. Do you mean something like > KIP-159 ( > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 159%3A+Introducing+Rich+functions+to+Streams > )? > Headers available on StreamsDSL will allow users to create "custom" traces, > for instance: > > ``` > stream.map( (headers, k, v) -> { > Span span = kafkaTracing.nextSpan(headers).start(); > doSomething(k, v); > span.finish(); > } > ``` > > but it won't be possible to instrument the existing processors exposed by > DSL only by enabling headers on Streams DSL. > > If we can define a way to pass a `ProcessorSupplier` to be used by > `StreamsBuilder#internalTopology` -not sure if via constructor or some > other way- would be enough to support this use-case. > > > Also, as Matthias said, you would need to create a KIP to propose this > > change, but of course we can continue this preliminary discussion until > you > > feel confident to create the KIP. > > Happy to do it once the approach is clearer. > > Cheers, > Jorge. > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>) > escribió: > > > If I understand the request, it's about tracking the latencies for a > > specific record, not the aggregated latencies for each processor. > > > > Jorge, > > > > The diff you posted only contains the library-side changes, and it's not > > obvious how you would use this to insert the desired tracing code. > > Perhaps you could provide a snippet demonstrating how you want to use > this > > change to enable tracing? > > > > Also, as Matthias said, you would need to create a KIP to propose this > > change, but of course we can continue this preliminary discussion until > you > > feel confident to create the KIP. > > > > Off the top of my head, here are some other approaches you might > evaluate: > > * you mentioned interceptors. Perhaps we could create a > > ProcessorInterceptor interface and add a config to set it. > > * perhaps we could simply build the tracing headers into Streams. Is > there > > a benefit to making it customizable? > > > > Thanks for considering this problem! > > -John > > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Jorge, > > > > > > From the TracingProcessor implementation it seems you want to track > > > per-processor processing latency, is that right? If this is the case > you > > > can actually use the per-processor metrics which include latency > sensors. > > > > > > If you do want to track, for a certain record, what's the latency of > > > processing it, then you'd probably need the processor implementation in > > > your repo. In this case, though, I'd suggest to provide a > > > WrapperProcessorSupplier for the users than modifying > > > InternalStreamsTopology: more specifically, you can provide an > > > `abstract WrapperProcessorSupplier > > > implements ProcessorSupplier` and then let users to instantiate this > > class > > > instead of the "bare-metal" interface. WDYT? > > > > > > > > > Guozhang > > > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < > > > quilcate.jo...@gmail.com> wrote: > > > > > > > Thanks for your answer, Matthias! > > > > > > > > What I'm looking for is something similar to interceptors, but for > > Stream > > > > Processors. > > > > > > > > In Zipkin -and probably other tracing implementations as well- we are > > > using > > > > Headers to propagate the context of a trace (i.e. adding metadata to > > the > > > > Kafka Record, so we can create references to a trace). > > > > Now that Headers are part of Kafka Streams Processor API, we can > > > propagate > > > > context from input (Consumers) to outputs (Producers) by using > > > > `KafkaClientSupplier` (e.g. < > > > > https://github.com/openzipkin/brave/blob/master/ > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > > > > TracingKafkaClientSupplier.java > > > > >). > > > > > > > > "Input to Output" traces could be enough for some use-cases, but we > are > > > > looking for a more detailed trace -that could cover cases like > > > side-effects > > > > (e.g. for each processor), where input/output and processors > latencies > > > can > > > > be recorded. This is why I have been looking for how to decorate the > > > > `ProcessorSupplier` and all the changes shown in the comparison. Here > > is > > > a > > > > gist of how we are planning to decorate the `addProcessor` method: > > > > https://github.com/openzipkin/brave/compare/master...jeqo: > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > > > > > > > > Hope this makes a bit more sense now :) > > > > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > > > > matth...@confluent.io>) > > > > escribió: > > > > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine > removing > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an > > > > > internal class. > > > > > > > > > > However, the diff also shows > > > > > > > > > > > public Topology(final InternalTopologyBuilder > > > internalTopologyBuilder) > > > > { > > > > > > > > > > This has two impacts: first, it modifies `Topology` what is part of > > > > > public API and would require a KIP. Second, it exposes > > > > > `InternalTopologyBuilder` as part of the public API -- something we > > > > > should not do. > > > > > > > > > > I am also not sure, why you want to do this (btw: also public API > > > change > > > > > requiring a KIP). However, this should not be necessary. > > > > > > > > > > > public StreamsBuilder(final Topology topology) { > > > > > > > > > > > > > > > I think I am lacking some context what you try to achieve. Maybe > you > > > can > > > > > elaborate in the problem you try to solve? > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > > > > > > Hi everyone, > > > > > > > > > > > > I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > One option is to override and access > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method > it is > > > > > final, > > > > > > and builder is not exposed as part of `StreamsBuilder`: > > > > > > > > > > > > ``` > > > > > > public class StreamsBuilder { > > > > > > > > > > > > /** The actual topology that is constructed by this > > > StreamsBuilder. > > > > > */ > > > > > > private final Topology topology = new Topology(); > > > > > > > > > > > > /** The topology's internal builder. */ > > > > > > final InternalTopologyBuilder internalTopologyBuilder = > > > > > > topology.internalTopologyBuilder; > > > > > > > > > > > > private final InternalStreamsBuilder internalStreamsBuilder = > > new > > > > > > InternalStreamsBuilder(internalTopologyBuilder); > > > > > > ``` > > > > > > > > > > > > The goal is that If `builder#addProcessor` is exposed, we could > > > > decorate > > > > > > every `ProcessorSupplier` and capture traces from it: > > > > > > > > > > > > ``` > > > > > > @Override > > > > > > public void addProcessor(String name, ProcessorSupplier > supplier, > > > > > > String... predecessorNames) { > > > > > > super.addProcessor(name, new TracingProcessorSupplier( > tracer, > > > > name, > > > > > > supplier), predecessorNames); > > > > > > } > > > > > > ``` > > > > > > > > > > > > Would it make sense to propose this as a change: > > > > > > > > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > > > > ? > > > > > or > > > > > > maybe there is a better way to do this? > > > > > > TopologyWrapper does something similar: > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java > > > > > > > > > > > > Thanks in advance for any help. > > > > > > > > > > > > Cheers, > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > -- -- Guozhang