Just for clarification: `peek()` would run on the same thread and the previous operator. Even if---strictly speaking---there is no public contract to guarantee this, it would be the case in the current implementation, and I also don't see any reason why this would change at any point in the future, because it's the most efficient implementation I can think of.
-Matthias On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote: > Thanks, everyone! > > @Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek` > processor runs on a potentially different thread, then passing the trace > between them could be challenging. It will also require users to add these > operators themselves, which could be too cumbersome to use. > > @Guozhang and @John: I will first focus on creating the > `TracingProcessorSupplier` for instrumenting custom `Processors` and I will > keep the idea of a `ProcessorInterceptor` in the back of my head to see if > it make sense to propose a KIP for this. > > Thanks again for your feedback! > > Cheers, > Jorge. > El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>) > escribió: > >> Jorge: >> >> I have a crazy idea off the top of my head. >> >> Would something as low-tech using KSteam.peek calls on either side of >> certain processors to record start and end times work? >> >> Thanks, >> Bill >> >> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote: >> >>> Jorge: >>> >>> My suggestion was to let your users to implement on the >>> TracingProcessorSupplier >>> / TracingProcessor directly instead of the base-line ProcessorSupplier / >>> Processor. Would that work for you? >>> >>> >>> Guozhang >>> >>> >>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < >>> quilcate.jo...@gmail.com> wrote: >>> >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks >>>> Guozhang and John. >>>> >>>> @Guozhang: >>>> >>>>> I'd suggest to provide a >>>>> WrapperProcessorSupplier for the users than modifying >>>>> InternalStreamsTopology: more specifically, you can provide an >>>>> `abstract WrapperProcessorSupplier >>>>> implements ProcessorSupplier` and then let users to instantiate this >>>> class >>>>> instead of the "bare-metal" interface. WDYT? >>>> >>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`: >>>> >>>> ``` >>>> public class TracingProcessorSupplier<K, V> implements >>> ProcessorSupplier<K, >>>> V> { >>>> final KafkaTracing kafkaTracing; >>>> final String name; >>>> final ProcessorSupplier<K, V> delegate; >>>> public TracingProcessorSupplier(KafkaTracing kafkaTracing, >>>> String name, ProcessorSupplier<K, V> delegate) { >>>> this.kafkaTracing = kafkaTracing; >>>> this.name = name; >>>> this.delegate = delegate; >>>> } >>>> @Override public Processor<K, V> get() { >>>> return new TracingProcessor<>(kafkaTracing, name, delegate.get()); >>>> } >>>> } >>>> ``` >>>> >>>> My challenge is how to wrap Topology Processors created by >>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by >>> Kafka >>>> Streams users. >>>> >>>> @John: >>>> >>>>> The diff you posted only contains the library-side changes, and it's >>> not >>>>> obvious how you would use this to insert the desired tracing code. >>>>> Perhaps you could provide a snippet demonstrating how you want to use >>>> this >>>>> change to enable tracing? >>>> >>>> My first approach was something like this: >>>> >>>> ``` >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder(); >>>> ``` >>>> >>>> Where `KafkaStreamsTracing#builder` looks like this: >>>> >>>> ``` >>>> public StreamsBuilder builder() { >>>> return new StreamsBuilder(new Topology(new >>>> TracingInternalTopologyBuilder(kafkaTracing))); >>>> } >>>> ``` >>>> >>>> Then, once the builder creates a topology, `processors` will be wrapped >>> by >>>> `TracingProcessorSupplier` described above. >>>> >>>> Probably this approach is too naive but works as an initial proof of >>>> concept. >>>> >>>>> Off the top of my head, here are some other approaches you might >>>> evaluate: >>>>> * you mentioned interceptors. Perhaps we could create a >>>>> ProcessorInterceptor interface and add a config to set it. >>>> >>>> This sounds very interesting to me. Then we won't need to touch >> internal >>>> API's, and just provide some configs. One challenge here is how to >> define >>>> the hooks. In consumer/producer, lifecycle is clear, >>> `onConsumer`/`onSend` >>>> and then `onCommit`/`onAck` methods. For Stream processors, how this >> will >>>> look like? Maybe `beforeProcess(context, key, value)` and >>>> `afterProcess(context, key, value)`. >>>> >>>>> * perhaps we could simply build the tracing headers into Streams. Is >>>> there >>>>> a benefit to making it customizable? >>>> >>>> I don't understand this option completely. Do you mean something like >>>> KIP-159 ( >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 159%3A+Introducing+Rich+functions+to+Streams >>>> )? >>>> Headers available on StreamsDSL will allow users to create "custom" >>> traces, >>>> for instance: >>>> >>>> ``` >>>> stream.map( (headers, k, v) -> { >>>> Span span = kafkaTracing.nextSpan(headers).start(); >>>> doSomething(k, v); >>>> span.finish(); >>>> } >>>> ``` >>>> >>>> but it won't be possible to instrument the existing processors exposed >> by >>>> DSL only by enabling headers on Streams DSL. >>>> >>>> If we can define a way to pass a `ProcessorSupplier` to be used by >>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some >>>> other way- would be enough to support this use-case. >>>> >>>>> Also, as Matthias said, you would need to create a KIP to propose >> this >>>>> change, but of course we can continue this preliminary discussion >> until >>>> you >>>>> feel confident to create the KIP. >>>> >>>> Happy to do it once the approach is clearer. >>>> >>>> Cheers, >>>> Jorge. >>>> >>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>) >>>> escribió: >>>> >>>>> If I understand the request, it's about tracking the latencies for a >>>>> specific record, not the aggregated latencies for each processor. >>>>> >>>>> Jorge, >>>>> >>>>> The diff you posted only contains the library-side changes, and it's >>> not >>>>> obvious how you would use this to insert the desired tracing code. >>>>> Perhaps you could provide a snippet demonstrating how you want to use >>>> this >>>>> change to enable tracing? >>>>> >>>>> Also, as Matthias said, you would need to create a KIP to propose >> this >>>>> change, but of course we can continue this preliminary discussion >> until >>>> you >>>>> feel confident to create the KIP. >>>>> >>>>> Off the top of my head, here are some other approaches you might >>>> evaluate: >>>>> * you mentioned interceptors. Perhaps we could create a >>>>> ProcessorInterceptor interface and add a config to set it. >>>>> * perhaps we could simply build the tracing headers into Streams. Is >>>> there >>>>> a benefit to making it customizable? >>>>> >>>>> Thanks for considering this problem! >>>>> -John >>>>> >>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> >>>> wrote: >>>>> >>>>>> Hello Jorge, >>>>>> >>>>>> From the TracingProcessor implementation it seems you want to track >>>>>> per-processor processing latency, is that right? If this is the >> case >>>> you >>>>>> can actually use the per-processor metrics which include latency >>>> sensors. >>>>>> >>>>>> If you do want to track, for a certain record, what's the latency >> of >>>>>> processing it, then you'd probably need the processor >> implementation >>> in >>>>>> your repo. In this case, though, I'd suggest to provide a >>>>>> WrapperProcessorSupplier for the users than modifying >>>>>> InternalStreamsTopology: more specifically, you can provide an >>>>>> `abstract WrapperProcessorSupplier >>>>>> implements ProcessorSupplier` and then let users to instantiate >> this >>>>> class >>>>>> instead of the "bare-metal" interface. WDYT? >>>>>> >>>>>> >>>>>> Guozhang >>>>>> >>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < >>>>>> quilcate.jo...@gmail.com> wrote: >>>>>> >>>>>>> Thanks for your answer, Matthias! >>>>>>> >>>>>>> What I'm looking for is something similar to interceptors, but >> for >>>>> Stream >>>>>>> Processors. >>>>>>> >>>>>>> In Zipkin -and probably other tracing implementations as well- we >>> are >>>>>> using >>>>>>> Headers to propagate the context of a trace (i.e. adding metadata >>> to >>>>> the >>>>>>> Kafka Record, so we can create references to a trace). >>>>>>> Now that Headers are part of Kafka Streams Processor API, we can >>>>>> propagate >>>>>>> context from input (Consumers) to outputs (Producers) by using >>>>>>> `KafkaClientSupplier` (e.g. < >>>>>>> https://github.com/openzipkin/brave/blob/master/ >>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ >>>>>>> TracingKafkaClientSupplier.java >>>>>>>> ). >>>>>>> >>>>>>> "Input to Output" traces could be enough for some use-cases, but >> we >>>> are >>>>>>> looking for a more detailed trace -that could cover cases like >>>>>> side-effects >>>>>>> (e.g. for each processor), where input/output and processors >>>> latencies >>>>>> can >>>>>>> be recorded. This is why I have been looking for how to decorate >>> the >>>>>>> `ProcessorSupplier` and all the changes shown in the comparison. >>> Here >>>>> is >>>>>> a >>>>>>> gist of how we are planning to decorate the `addProcessor` >> method: >>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo: >>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 >>>>>>> >>>>>>> Hope this makes a bit more sense now :) >>>>>>> >>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< >>>>>>> matth...@confluent.io>) >>>>>>> escribió: >>>>>>> >>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. >>>>>>>> >>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine >>>> removing >>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's >> an >>>>>>>> internal class. >>>>>>>> >>>>>>>> However, the diff also shows >>>>>>>> >>>>>>>>> public Topology(final InternalTopologyBuilder >>>>>> internalTopologyBuilder) >>>>>>> { >>>>>>>> >>>>>>>> This has two impacts: first, it modifies `Topology` what is >> part >>> of >>>>>>>> public API and would require a KIP. Second, it exposes >>>>>>>> `InternalTopologyBuilder` as part of the public API -- >> something >>> we >>>>>>>> should not do. >>>>>>>> >>>>>>>> I am also not sure, why you want to do this (btw: also public >> API >>>>>> change >>>>>>>> requiring a KIP). However, this should not be necessary. >>>>>>>> >>>>>>>>> public StreamsBuilder(final Topology topology) { >>>>>>>> >>>>>>>> >>>>>>>> I think I am lacking some context what you try to achieve. >> Maybe >>>> you >>>>>> can >>>>>>>> elaborate in the problem you try to solve? >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: >>>>>>>>> Hi everyone, >>>>>>>>> >>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. >>>>>>>>> >>>>>>>>> One option is to override and access >>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method >>>> it is >>>>>>>> final, >>>>>>>>> and builder is not exposed as part of `StreamsBuilder`: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> public class StreamsBuilder { >>>>>>>>> >>>>>>>>> /** The actual topology that is constructed by this >>>>>> StreamsBuilder. >>>>>>>> */ >>>>>>>>> private final Topology topology = new Topology(); >>>>>>>>> >>>>>>>>> /** The topology's internal builder. */ >>>>>>>>> final InternalTopologyBuilder internalTopologyBuilder = >>>>>>>>> topology.internalTopologyBuilder; >>>>>>>>> >>>>>>>>> private final InternalStreamsBuilder >>> internalStreamsBuilder = >>>>> new >>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder); >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we >> could >>>>>>> decorate >>>>>>>>> every `ProcessorSupplier` and capture traces from it: >>>>>>>>> >>>>>>>>> ``` >>>>>>>>> @Override >>>>>>>>> public void addProcessor(String name, ProcessorSupplier >>>> supplier, >>>>>>>>> String... predecessorNames) { >>>>>>>>> super.addProcessor(name, new TracingProcessorSupplier( >>>> tracer, >>>>>>> name, >>>>>>>>> supplier), predecessorNames); >>>>>>>>> } >>>>>>>>> ``` >>>>>>>>> >>>>>>>>> Would it make sense to propose this as a change: >>>>>>>>> >>>>>> >>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology >>>>>>> ? >>>>>>>> or >>>>>>>>> maybe there is a better way to do this? >>>>>>>>> TopologyWrapper does something similar: >>>>>>>>> >>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/ >>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java >>>>>>>>> >>>>>>>>> Thanks in advance for any help. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Jorge. >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> -- Guozhang >>>>>> >>>>> >>>> >>> >>> >>> >>> -- >>> -- Guozhang >>> >> >
signature.asc
Description: OpenPGP digital signature