For most operators yes. For sure, for all stateless operators like `mapValues()` -- for stateful operators, it depends if data repartitioning is required. If yes, the topology would be split into two sub-topologies and thus, both `peek()` operations could run on different threads.
If you want to double check, you can describe a topology before executing via `Topology.describe()` -- all operators of a single sub-topology will be executed single threaded. -Matthias On 9/26/18 12:14 AM, Jorge Esteban Quilcate Otoya wrote: > Good to know, thanks Matthias! > > You've mentioned a previous operator, but what about: > `peek().mapValues().peek()`, will both `peek`s be in the same thread as > well? > > El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<matth...@confluent.io>) > escribió: > >> Just for clarification: >> >> `peek()` would run on the same thread and the previous operator. Even >> if---strictly speaking---there is no public contract to guarantee this, >> it would be the case in the current implementation, and I also don't see >> any reason why this would change at any point in the future, because >> it's the most efficient implementation I can think of. >> >> -Matthias >> >> On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote: >>> Thanks, everyone! >>> >>> @Bill, the main issue with using `KStraem#peek()` is that AFAIK each >> `peek` >>> processor runs on a potentially different thread, then passing the trace >>> between them could be challenging. It will also require users to add >> these >>> operators themselves, which could be too cumbersome to use. >>> >>> @Guozhang and @John: I will first focus on creating the >>> `TracingProcessorSupplier` for instrumenting custom `Processors` and I >> will >>> keep the idea of a `ProcessorInterceptor` in the back of my head to see >> if >>> it make sense to propose a KIP for this. >>> >>> Thanks again for your feedback! >>> >>> Cheers, >>> Jorge. >>> El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>) >>> escribió: >>> >>>> Jorge: >>>> >>>> I have a crazy idea off the top of my head. >>>> >>>> Would something as low-tech using KSteam.peek calls on either side of >>>> certain processors to record start and end times work? >>>> >>>> Thanks, >>>> Bill >>>> >>>> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> >> wrote: >>>> >>>>> Jorge: >>>>> >>>>> My suggestion was to let your users to implement on the >>>>> TracingProcessorSupplier >>>>> / TracingProcessor directly instead of the base-line ProcessorSupplier >> / >>>>> Processor. Would that work for you? >>>>> >>>>> >>>>> Guozhang >>>>> >>>>> >>>>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < >>>>> quilcate.jo...@gmail.com> wrote: >>>>> >>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks >>>>>> Guozhang and John. >>>>>> >>>>>> @Guozhang: >>>>>> >>>>>>> I'd suggest to provide a >>>>>>> WrapperProcessorSupplier for the users than modifying >>>>>>> InternalStreamsTopology: more specifically, you can provide an >>>>>>> `abstract WrapperProcessorSupplier >>>>>>> implements ProcessorSupplier` and then let users to instantiate this >>>>>> class >>>>>>> instead of the "bare-metal" interface. WDYT? >>>>>> >>>>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`: >>>>>> >>>>>> ``` >>>>>> public class TracingProcessorSupplier<K, V> implements >>>>> ProcessorSupplier<K, >>>>>> V> { >>>>>> final KafkaTracing kafkaTracing; >>>>>> final String name; >>>>>> final ProcessorSupplier<K, V> delegate; >>>>>> public TracingProcessorSupplier(KafkaTracing kafkaTracing, >>>>>> String name, ProcessorSupplier<K, V> delegate) { >>>>>> this.kafkaTracing = kafkaTracing; >>>>>> this.name = name; >>>>>> this.delegate = delegate; >>>>>> } >>>>>> @Override public Processor<K, V> get() { >>>>>> return new TracingProcessor<>(kafkaTracing, name, delegate.get()); >>>>>> } >>>>>> } >>>>>> ``` >>>>>> >>>>>> My challenge is how to wrap Topology Processors created by >>>>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by >>>>> Kafka >>>>>> Streams users. >>>>>> >>>>>> @John: >>>>>> >>>>>>> The diff you posted only contains the library-side changes, and it's >>>>> not >>>>>>> obvious how you would use this to insert the desired tracing code. >>>>>>> Perhaps you could provide a snippet demonstrating how you want to use >>>>>> this >>>>>>> change to enable tracing? >>>>>> >>>>>> My first approach was something like this: >>>>>> >>>>>> ``` >>>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder(); >>>>>> ``` >>>>>> >>>>>> Where `KafkaStreamsTracing#builder` looks like this: >>>>>> >>>>>> ``` >>>>>> public StreamsBuilder builder() { >>>>>> return new StreamsBuilder(new Topology(new >>>>>> TracingInternalTopologyBuilder(kafkaTracing))); >>>>>> } >>>>>> ``` >>>>>> >>>>>> Then, once the builder creates a topology, `processors` will be >> wrapped >>>>> by >>>>>> `TracingProcessorSupplier` described above. >>>>>> >>>>>> Probably this approach is too naive but works as an initial proof of >>>>>> concept. >>>>>> >>>>>>> Off the top of my head, here are some other approaches you might >>>>>> evaluate: >>>>>>> * you mentioned interceptors. Perhaps we could create a >>>>>>> ProcessorInterceptor interface and add a config to set it. >>>>>> >>>>>> This sounds very interesting to me. Then we won't need to touch >>>> internal >>>>>> API's, and just provide some configs. One challenge here is how to >>>> define >>>>>> the hooks. In consumer/producer, lifecycle is clear, >>>>> `onConsumer`/`onSend` >>>>>> and then `onCommit`/`onAck` methods. For Stream processors, how this >>>> will >>>>>> look like? Maybe `beforeProcess(context, key, value)` and >>>>>> `afterProcess(context, key, value)`. >>>>>> >>>>>>> * perhaps we could simply build the tracing headers into Streams. Is >>>>>> there >>>>>>> a benefit to making it customizable? >>>>>> >>>>>> I don't understand this option completely. Do you mean something like >>>>>> KIP-159 ( >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>>> 159%3A+Introducing+Rich+functions+to+Streams >>>>>> )? >>>>>> Headers available on StreamsDSL will allow users to create "custom" >>>>> traces, >>>>>> for instance: >>>>>> >>>>>> ``` >>>>>> stream.map( (headers, k, v) -> { >>>>>> Span span = kafkaTracing.nextSpan(headers).start(); >>>>>> doSomething(k, v); >>>>>> span.finish(); >>>>>> } >>>>>> ``` >>>>>> >>>>>> but it won't be possible to instrument the existing processors exposed >>>> by >>>>>> DSL only by enabling headers on Streams DSL. >>>>>> >>>>>> If we can define a way to pass a `ProcessorSupplier` to be used by >>>>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some >>>>>> other way- would be enough to support this use-case. >>>>>> >>>>>>> Also, as Matthias said, you would need to create a KIP to propose >>>> this >>>>>>> change, but of course we can continue this preliminary discussion >>>> until >>>>>> you >>>>>>> feel confident to create the KIP. >>>>>> >>>>>> Happy to do it once the approach is clearer. >>>>>> >>>>>> Cheers, >>>>>> Jorge. >>>>>> >>>>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io >>> ) >>>>>> escribió: >>>>>> >>>>>>> If I understand the request, it's about tracking the latencies for a >>>>>>> specific record, not the aggregated latencies for each processor. >>>>>>> >>>>>>> Jorge, >>>>>>> >>>>>>> The diff you posted only contains the library-side changes, and it's >>>>> not >>>>>>> obvious how you would use this to insert the desired tracing code. >>>>>>> Perhaps you could provide a snippet demonstrating how you want to use >>>>>> this >>>>>>> change to enable tracing? >>>>>>> >>>>>>> Also, as Matthias said, you would need to create a KIP to propose >>>> this >>>>>>> change, but of course we can continue this preliminary discussion >>>> until >>>>>> you >>>>>>> feel confident to create the KIP. >>>>>>> >>>>>>> Off the top of my head, here are some other approaches you might >>>>>> evaluate: >>>>>>> * you mentioned interceptors. Perhaps we could create a >>>>>>> ProcessorInterceptor interface and add a config to set it. >>>>>>> * perhaps we could simply build the tracing headers into Streams. Is >>>>>> there >>>>>>> a benefit to making it customizable? >>>>>>> >>>>>>> Thanks for considering this problem! >>>>>>> -John >>>>>>> >>>>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> >>>>>> wrote: >>>>>>> >>>>>>>> Hello Jorge, >>>>>>>> >>>>>>>> From the TracingProcessor implementation it seems you want to track >>>>>>>> per-processor processing latency, is that right? If this is the >>>> case >>>>>> you >>>>>>>> can actually use the per-processor metrics which include latency >>>>>> sensors. >>>>>>>> >>>>>>>> If you do want to track, for a certain record, what's the latency >>>> of >>>>>>>> processing it, then you'd probably need the processor >>>> implementation >>>>> in >>>>>>>> your repo. In this case, though, I'd suggest to provide a >>>>>>>> WrapperProcessorSupplier for the users than modifying >>>>>>>> InternalStreamsTopology: more specifically, you can provide an >>>>>>>> `abstract WrapperProcessorSupplier >>>>>>>> implements ProcessorSupplier` and then let users to instantiate >>>> this >>>>>>> class >>>>>>>> instead of the "bare-metal" interface. WDYT? >>>>>>>> >>>>>>>> >>>>>>>> Guozhang >>>>>>>> >>>>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < >>>>>>>> quilcate.jo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thanks for your answer, Matthias! >>>>>>>>> >>>>>>>>> What I'm looking for is something similar to interceptors, but >>>> for >>>>>>> Stream >>>>>>>>> Processors. >>>>>>>>> >>>>>>>>> In Zipkin -and probably other tracing implementations as well- we >>>>> are >>>>>>>> using >>>>>>>>> Headers to propagate the context of a trace (i.e. adding metadata >>>>> to >>>>>>> the >>>>>>>>> Kafka Record, so we can create references to a trace). >>>>>>>>> Now that Headers are part of Kafka Streams Processor API, we can >>>>>>>> propagate >>>>>>>>> context from input (Consumers) to outputs (Producers) by using >>>>>>>>> `KafkaClientSupplier` (e.g. < >>>>>>>>> https://github.com/openzipkin/brave/blob/master/ >>>>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ >>>>>>>>> TracingKafkaClientSupplier.java >>>>>>>>>> ). >>>>>>>>> >>>>>>>>> "Input to Output" traces could be enough for some use-cases, but >>>> we >>>>>> are >>>>>>>>> looking for a more detailed trace -that could cover cases like >>>>>>>> side-effects >>>>>>>>> (e.g. for each processor), where input/output and processors >>>>>> latencies >>>>>>>> can >>>>>>>>> be recorded. This is why I have been looking for how to decorate >>>>> the >>>>>>>>> `ProcessorSupplier` and all the changes shown in the comparison. >>>>> Here >>>>>>> is >>>>>>>> a >>>>>>>>> gist of how we are planning to decorate the `addProcessor` >>>> method: >>>>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo: >>>>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 >>>>>>>>> >>>>>>>>> Hope this makes a bit more sense now :) >>>>>>>>> >>>>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< >>>>>>>>> matth...@confluent.io>) >>>>>>>>> escribió: >>>>>>>>> >>>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. >>>>>>>>>> >>>>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine >>>>>> removing >>>>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's >>>> an >>>>>>>>>> internal class. >>>>>>>>>> >>>>>>>>>> However, the diff also shows >>>>>>>>>> >>>>>>>>>>> public Topology(final InternalTopologyBuilder >>>>>>>> internalTopologyBuilder) >>>>>>>>> { >>>>>>>>>> >>>>>>>>>> This has two impacts: first, it modifies `Topology` what is >>>> part >>>>> of >>>>>>>>>> public API and would require a KIP. Second, it exposes >>>>>>>>>> `InternalTopologyBuilder` as part of the public API -- >>>> something >>>>> we >>>>>>>>>> should not do. >>>>>>>>>> >>>>>>>>>> I am also not sure, why you want to do this (btw: also public >>>> API >>>>>>>> change >>>>>>>>>> requiring a KIP). However, this should not be necessary. >>>>>>>>>> >>>>>>>>>>> public StreamsBuilder(final Topology topology) { >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I think I am lacking some context what you try to achieve. >>>> Maybe >>>>>> you >>>>>>>> can >>>>>>>>>> elaborate in the problem you try to solve? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -Matthias >>>>>>>>>> >>>>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: >>>>>>>>>>> Hi everyone, >>>>>>>>>>> >>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. >>>>>>>>>>> >>>>>>>>>>> One option is to override and access >>>>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method >>>>>> it is >>>>>>>>>> final, >>>>>>>>>>> and builder is not exposed as part of `StreamsBuilder`: >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> public class StreamsBuilder { >>>>>>>>>>> >>>>>>>>>>> /** The actual topology that is constructed by this >>>>>>>> StreamsBuilder. >>>>>>>>>> */ >>>>>>>>>>> private final Topology topology = new Topology(); >>>>>>>>>>> >>>>>>>>>>> /** The topology's internal builder. */ >>>>>>>>>>> final InternalTopologyBuilder internalTopologyBuilder = >>>>>>>>>>> topology.internalTopologyBuilder; >>>>>>>>>>> >>>>>>>>>>> private final InternalStreamsBuilder >>>>> internalStreamsBuilder = >>>>>>> new >>>>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder); >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we >>>> could >>>>>>>>> decorate >>>>>>>>>>> every `ProcessorSupplier` and capture traces from it: >>>>>>>>>>> >>>>>>>>>>> ``` >>>>>>>>>>> @Override >>>>>>>>>>> public void addProcessor(String name, ProcessorSupplier >>>>>> supplier, >>>>>>>>>>> String... predecessorNames) { >>>>>>>>>>> super.addProcessor(name, new TracingProcessorSupplier( >>>>>> tracer, >>>>>>>>> name, >>>>>>>>>>> supplier), predecessorNames); >>>>>>>>>>> } >>>>>>>>>>> ``` >>>>>>>>>>> >>>>>>>>>>> Would it make sense to propose this as a change: >>>>>>>>>>> >>>>>>>> >>>>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology >>>>>>>>> ? >>>>>>>>>> or >>>>>>>>>>> maybe there is a better way to do this? >>>>>>>>>>> TopologyWrapper does something similar: >>>>>>>>>>> >>>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/ >>>>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java >>>>>>>>>>> >>>>>>>>>>> Thanks in advance for any help. >>>>>>>>>>> >>>>>>>>>>> Cheers, >>>>>>>>>>> Jorge. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> -- Guozhang >>>>>>>> >>>>>>> >>>>>> >>>>> >>>>> >>>>> >>>>> -- >>>>> -- Guozhang >>>>> >>>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature