Jorge: I have a crazy idea off the top of my head.
Would something as low-tech using KSteam.peek calls on either side of certain processors to record start and end times work? Thanks, Bill On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote: > Jorge: > > My suggestion was to let your users to implement on the > TracingProcessorSupplier > / TracingProcessor directly instead of the base-line ProcessorSupplier / > Processor. Would that work for you? > > > Guozhang > > > On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < > quilcate.jo...@gmail.com> wrote: > > > final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks > > Guozhang and John. > > > > @Guozhang: > > > > > I'd suggest to provide a > > > WrapperProcessorSupplier for the users than modifying > > > InternalStreamsTopology: more specifically, you can provide an > > > `abstract WrapperProcessorSupplier > > > implements ProcessorSupplier` and then let users to instantiate this > > class > > > instead of the "bare-metal" interface. WDYT? > > > > Yes, in the gist, I have a class implementing `ProcessorSupplier`: > > > > ``` > > public class TracingProcessorSupplier<K, V> implements > ProcessorSupplier<K, > > V> { > > final KafkaTracing kafkaTracing; > > final String name; > > final ProcessorSupplier<K, V> delegate; > > public TracingProcessorSupplier(KafkaTracing kafkaTracing, > > String name, ProcessorSupplier<K, V> delegate) { > > this.kafkaTracing = kafkaTracing; > > this.name = name; > > this.delegate = delegate; > > } > > @Override public Processor<K, V> get() { > > return new TracingProcessor<>(kafkaTracing, name, delegate.get()); > > } > > } > > ``` > > > > My challenge is how to wrap Topology Processors created by > > `StreamsBuilder#build` to make this instrumentation easy to adopt by > Kafka > > Streams users. > > > > @John: > > > > > The diff you posted only contains the library-side changes, and it's > not > > > obvious how you would use this to insert the desired tracing code. > > > Perhaps you could provide a snippet demonstrating how you want to use > > this > > > change to enable tracing? > > > > My first approach was something like this: > > > > ``` > > final StreamsBuilder builder = kafkaStreamsTracing.builder(); > > ``` > > > > Where `KafkaStreamsTracing#builder` looks like this: > > > > ``` > > public StreamsBuilder builder() { > > return new StreamsBuilder(new Topology(new > > TracingInternalTopologyBuilder(kafkaTracing))); > > } > > ``` > > > > Then, once the builder creates a topology, `processors` will be wrapped > by > > `TracingProcessorSupplier` described above. > > > > Probably this approach is too naive but works as an initial proof of > > concept. > > > > > Off the top of my head, here are some other approaches you might > > evaluate: > > > * you mentioned interceptors. Perhaps we could create a > > > ProcessorInterceptor interface and add a config to set it. > > > > This sounds very interesting to me. Then we won't need to touch internal > > API's, and just provide some configs. One challenge here is how to define > > the hooks. In consumer/producer, lifecycle is clear, > `onConsumer`/`onSend` > > and then `onCommit`/`onAck` methods. For Stream processors, how this will > > look like? Maybe `beforeProcess(context, key, value)` and > > `afterProcess(context, key, value)`. > > > > > * perhaps we could simply build the tracing headers into Streams. Is > > there > > > a benefit to making it customizable? > > > > I don't understand this option completely. Do you mean something like > > KIP-159 ( > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > 159%3A+Introducing+Rich+functions+to+Streams > > )? > > Headers available on StreamsDSL will allow users to create "custom" > traces, > > for instance: > > > > ``` > > stream.map( (headers, k, v) -> { > > Span span = kafkaTracing.nextSpan(headers).start(); > > doSomething(k, v); > > span.finish(); > > } > > ``` > > > > but it won't be possible to instrument the existing processors exposed by > > DSL only by enabling headers on Streams DSL. > > > > If we can define a way to pass a `ProcessorSupplier` to be used by > > `StreamsBuilder#internalTopology` -not sure if via constructor or some > > other way- would be enough to support this use-case. > > > > > Also, as Matthias said, you would need to create a KIP to propose this > > > change, but of course we can continue this preliminary discussion until > > you > > > feel confident to create the KIP. > > > > Happy to do it once the approach is clearer. > > > > Cheers, > > Jorge. > > > > El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>) > > escribió: > > > > > If I understand the request, it's about tracking the latencies for a > > > specific record, not the aggregated latencies for each processor. > > > > > > Jorge, > > > > > > The diff you posted only contains the library-side changes, and it's > not > > > obvious how you would use this to insert the desired tracing code. > > > Perhaps you could provide a snippet demonstrating how you want to use > > this > > > change to enable tracing? > > > > > > Also, as Matthias said, you would need to create a KIP to propose this > > > change, but of course we can continue this preliminary discussion until > > you > > > feel confident to create the KIP. > > > > > > Off the top of my head, here are some other approaches you might > > evaluate: > > > * you mentioned interceptors. Perhaps we could create a > > > ProcessorInterceptor interface and add a config to set it. > > > * perhaps we could simply build the tracing headers into Streams. Is > > there > > > a benefit to making it customizable? > > > > > > Thanks for considering this problem! > > > -John > > > > > > On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> > > wrote: > > > > > > > Hello Jorge, > > > > > > > > From the TracingProcessor implementation it seems you want to track > > > > per-processor processing latency, is that right? If this is the case > > you > > > > can actually use the per-processor metrics which include latency > > sensors. > > > > > > > > If you do want to track, for a certain record, what's the latency of > > > > processing it, then you'd probably need the processor implementation > in > > > > your repo. In this case, though, I'd suggest to provide a > > > > WrapperProcessorSupplier for the users than modifying > > > > InternalStreamsTopology: more specifically, you can provide an > > > > `abstract WrapperProcessorSupplier > > > > implements ProcessorSupplier` and then let users to instantiate this > > > class > > > > instead of the "bare-metal" interface. WDYT? > > > > > > > > > > > > Guozhang > > > > > > > > On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < > > > > quilcate.jo...@gmail.com> wrote: > > > > > > > > > Thanks for your answer, Matthias! > > > > > > > > > > What I'm looking for is something similar to interceptors, but for > > > Stream > > > > > Processors. > > > > > > > > > > In Zipkin -and probably other tracing implementations as well- we > are > > > > using > > > > > Headers to propagate the context of a trace (i.e. adding metadata > to > > > the > > > > > Kafka Record, so we can create references to a trace). > > > > > Now that Headers are part of Kafka Streams Processor API, we can > > > > propagate > > > > > context from input (Consumers) to outputs (Producers) by using > > > > > `KafkaClientSupplier` (e.g. < > > > > > https://github.com/openzipkin/brave/blob/master/ > > > > > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > > > > > TracingKafkaClientSupplier.java > > > > > >). > > > > > > > > > > "Input to Output" traces could be enough for some use-cases, but we > > are > > > > > looking for a more detailed trace -that could cover cases like > > > > side-effects > > > > > (e.g. for each processor), where input/output and processors > > latencies > > > > can > > > > > be recorded. This is why I have been looking for how to decorate > the > > > > > `ProcessorSupplier` and all the changes shown in the comparison. > Here > > > is > > > > a > > > > > gist of how we are planning to decorate the `addProcessor` method: > > > > > https://github.com/openzipkin/brave/compare/master...jeqo: > > > > > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > > > > > > > > > > Hope this makes a bit more sense now :) > > > > > > > > > > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > > > > > matth...@confluent.io>) > > > > > escribió: > > > > > > > > > > > >> I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > What do you mean by this exactly? Is there a JIRA? I am fine > > removing > > > > > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an > > > > > > internal class. > > > > > > > > > > > > However, the diff also shows > > > > > > > > > > > > > public Topology(final InternalTopologyBuilder > > > > internalTopologyBuilder) > > > > > { > > > > > > > > > > > > This has two impacts: first, it modifies `Topology` what is part > of > > > > > > public API and would require a KIP. Second, it exposes > > > > > > `InternalTopologyBuilder` as part of the public API -- something > we > > > > > > should not do. > > > > > > > > > > > > I am also not sure, why you want to do this (btw: also public API > > > > change > > > > > > requiring a KIP). However, this should not be necessary. > > > > > > > > > > > > > public StreamsBuilder(final Topology topology) { > > > > > > > > > > > > > > > > > > I think I am lacking some context what you try to achieve. Maybe > > you > > > > can > > > > > > elaborate in the problem you try to solve? > > > > > > > > > > > > > > > > > > -Matthias > > > > > > > > > > > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > > > > > > > Hi everyone, > > > > > > > > > > > > > > I'm experimenting on how to add tracing to Kafka Streams. > > > > > > > > > > > > > > One option is to override and access > > > > > > > `InternalTopologyBuilder#addProcessor`. Currently this method > > it is > > > > > > final, > > > > > > > and builder is not exposed as part of `StreamsBuilder`: > > > > > > > > > > > > > > ``` > > > > > > > public class StreamsBuilder { > > > > > > > > > > > > > > /** The actual topology that is constructed by this > > > > StreamsBuilder. > > > > > > */ > > > > > > > private final Topology topology = new Topology(); > > > > > > > > > > > > > > /** The topology's internal builder. */ > > > > > > > final InternalTopologyBuilder internalTopologyBuilder = > > > > > > > topology.internalTopologyBuilder; > > > > > > > > > > > > > > private final InternalStreamsBuilder > internalStreamsBuilder = > > > new > > > > > > > InternalStreamsBuilder(internalTopologyBuilder); > > > > > > > ``` > > > > > > > > > > > > > > The goal is that If `builder#addProcessor` is exposed, we could > > > > > decorate > > > > > > > every `ProcessorSupplier` and capture traces from it: > > > > > > > > > > > > > > ``` > > > > > > > @Override > > > > > > > public void addProcessor(String name, ProcessorSupplier > > supplier, > > > > > > > String... predecessorNames) { > > > > > > > super.addProcessor(name, new TracingProcessorSupplier( > > tracer, > > > > > name, > > > > > > > supplier), predecessorNames); > > > > > > > } > > > > > > > ``` > > > > > > > > > > > > > > Would it make sense to propose this as a change: > > > > > > > > > > > > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > > > > > ? > > > > > > or > > > > > > > maybe there is a better way to do this? > > > > > > > TopologyWrapper does something similar: > > > > > > > > > > > > > https://github.com/apache/kafka/blob/trunk/streams/src/ > > > > > test/java/org/apache/kafka/streams/TopologyWrapper.java > > > > > > > > > > > > > > Thanks in advance for any help. > > > > > > > > > > > > > > Cheers, > > > > > > > Jorge. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > -- > -- Guozhang >