Just for clarification:

`peek()` would run on the same thread and the previous operator. Even
if---strictly speaking---there is no public contract to guarantee this,
it would be the case in the current implementation, and I also don't see
any reason why this would change at any point in the future, because
it's the most efficient implementation I can think of.

-Matthias

On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
> Thanks, everyone!
> 
> @Bill, the main issue with using `KStraem#peek()` is that AFAIK each `peek`
> processor runs on a potentially different thread, then passing the trace
> between them could be challenging. It will also require users to add these
> operators themselves, which could be too cumbersome to use.
> 
> @Guozhang and @John: I will first focus on creating the
> `TracingProcessorSupplier` for instrumenting custom `Processors` and I will
> keep the idea of a `ProcessorInterceptor` in the back of my head to see if
> it make sense to propose a KIP for this.
> 
> Thanks again for your feedback!
> 
> Cheers,
> Jorge.
> El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>)
> escribió:
> 
>> Jorge:
>>
>> I have a crazy idea off the top of my head.
>>
>> Would something as low-tech using KSteam.peek calls on either side of
>> certain processors to record start and end times work?
>>
>> Thanks,
>> Bill
>>
>> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> wrote:
>>
>>> Jorge:
>>>
>>> My suggestion was to let your users to implement on the
>>> TracingProcessorSupplier
>>> / TracingProcessor directly instead of the base-line ProcessorSupplier /
>>> Processor. Would that work for you?
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
>>> quilcate.jo...@gmail.com> wrote:
>>>
>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
>>>> Guozhang and John.
>>>>
>>>> @Guozhang:
>>>>
>>>>> I'd suggest to provide a
>>>>> WrapperProcessorSupplier for the users than modifying
>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>> `abstract WrapperProcessorSupplier
>>>>> implements ProcessorSupplier` and then let users to instantiate this
>>>> class
>>>>> instead of the "bare-metal" interface. WDYT?
>>>>
>>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
>>>>
>>>> ```
>>>> public class TracingProcessorSupplier<K, V> implements
>>> ProcessorSupplier<K,
>>>> V> {
>>>>   final KafkaTracing kafkaTracing;
>>>>   final String name;
>>>>   final ProcessorSupplier<K, V> delegate;
>>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
>>>>       String name, ProcessorSupplier<K, V> delegate) {
>>>>     this.kafkaTracing = kafkaTracing;
>>>>     this.name = name;
>>>>     this.delegate = delegate;
>>>>   }
>>>>    @Override public Processor<K, V> get() {
>>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
>>>>   }
>>>> }
>>>> ```
>>>>
>>>> My challenge is how to wrap Topology Processors created by
>>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
>>> Kafka
>>>> Streams users.
>>>>
>>>> @John:
>>>>
>>>>> The diff you posted only contains the library-side changes, and it's
>>> not
>>>>> obvious how you would use this to insert the desired tracing code.
>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>> this
>>>>> change to enable tracing?
>>>>
>>>> My first approach was something like this:
>>>>
>>>> ```
>>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
>>>> ```
>>>>
>>>> Where `KafkaStreamsTracing#builder` looks like this:
>>>>
>>>> ```
>>>>   public StreamsBuilder builder() {
>>>>     return new StreamsBuilder(new Topology(new
>>>> TracingInternalTopologyBuilder(kafkaTracing)));
>>>>   }
>>>> ```
>>>>
>>>> Then, once the builder creates a topology, `processors` will be wrapped
>>> by
>>>> `TracingProcessorSupplier` described above.
>>>>
>>>> Probably this approach is too naive but works as an initial proof of
>>>> concept.
>>>>
>>>>> Off the top of my head, here are some other approaches you might
>>>> evaluate:
>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>
>>>> This sounds very interesting to me. Then we won't need to touch
>> internal
>>>> API's, and just provide some configs. One challenge here is how to
>> define
>>>> the hooks. In consumer/producer, lifecycle is clear,
>>> `onConsumer`/`onSend`
>>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
>> will
>>>> look like? Maybe `beforeProcess(context, key, value)` and
>>>> `afterProcess(context, key, value)`.
>>>>
>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>> there
>>>>> a benefit to making it customizable?
>>>>
>>>> I don't understand this option completely. Do you mean something like
>>>> KIP-159 (
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 159%3A+Introducing+Rich+functions+to+Streams
>>>> )?
>>>> Headers available on StreamsDSL will allow users to create "custom"
>>> traces,
>>>> for instance:
>>>>
>>>> ```
>>>> stream.map( (headers, k, v) -> {
>>>>   Span span = kafkaTracing.nextSpan(headers).start();
>>>>   doSomething(k, v);
>>>>   span.finish();
>>>> }
>>>> ```
>>>>
>>>> but it won't be possible to instrument the existing processors exposed
>> by
>>>> DSL only by enabling headers on Streams DSL.
>>>>
>>>> If we can define a way to pass a `ProcessorSupplier` to be used by
>>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
>>>> other way- would be enough to support this use-case.
>>>>
>>>>> Also, as Matthias said, you would need to create a KIP to propose
>> this
>>>>> change, but of course we can continue this preliminary discussion
>> until
>>>> you
>>>>> feel confident to create the KIP.
>>>>
>>>> Happy to do it once the approach is clearer.
>>>>
>>>> Cheers,
>>>> Jorge.
>>>>
>>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io>)
>>>> escribió:
>>>>
>>>>> If I understand the request, it's about tracking the latencies for a
>>>>> specific record, not the aggregated latencies for each processor.
>>>>>
>>>>> Jorge,
>>>>>
>>>>> The diff you posted only contains the library-side changes, and it's
>>> not
>>>>> obvious how you would use this to insert the desired tracing code.
>>>>> Perhaps you could provide a snippet demonstrating how you want to use
>>>> this
>>>>> change to enable tracing?
>>>>>
>>>>> Also, as Matthias said, you would need to create a KIP to propose
>> this
>>>>> change, but of course we can continue this preliminary discussion
>> until
>>>> you
>>>>> feel confident to create the KIP.
>>>>>
>>>>> Off the top of my head, here are some other approaches you might
>>>> evaluate:
>>>>> * you mentioned interceptors. Perhaps we could create a
>>>>> ProcessorInterceptor interface and add a config to set it.
>>>>> * perhaps we could simply build the tracing headers into Streams. Is
>>>> there
>>>>> a benefit to making it customizable?
>>>>>
>>>>> Thanks for considering this problem!
>>>>> -John
>>>>>
>>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com>
>>>> wrote:
>>>>>
>>>>>> Hello Jorge,
>>>>>>
>>>>>> From the TracingProcessor implementation it seems you want to track
>>>>>> per-processor processing latency, is that right? If this is the
>> case
>>>> you
>>>>>> can actually use the per-processor metrics which include latency
>>>> sensors.
>>>>>>
>>>>>> If you do want to track, for a certain record, what's the latency
>> of
>>>>>> processing it, then you'd probably need the processor
>> implementation
>>> in
>>>>>> your repo. In this case, though, I'd suggest to provide a
>>>>>> WrapperProcessorSupplier for the users than modifying
>>>>>> InternalStreamsTopology: more specifically, you can provide an
>>>>>> `abstract WrapperProcessorSupplier
>>>>>> implements ProcessorSupplier` and then let users to instantiate
>> this
>>>>> class
>>>>>> instead of the "bare-metal" interface. WDYT?
>>>>>>
>>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
>>>>>> quilcate.jo...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for your answer, Matthias!
>>>>>>>
>>>>>>> What I'm looking for is something similar to interceptors, but
>> for
>>>>> Stream
>>>>>>> Processors.
>>>>>>>
>>>>>>> In Zipkin -and probably other tracing implementations as well- we
>>> are
>>>>>> using
>>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
>>> to
>>>>> the
>>>>>>> Kafka Record, so we can create references to a trace).
>>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
>>>>>> propagate
>>>>>>> context from input (Consumers) to outputs (Producers) by using
>>>>>>> `KafkaClientSupplier` (e.g. <
>>>>>>> https://github.com/openzipkin/brave/blob/master/
>>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
>>>>>>> TracingKafkaClientSupplier.java
>>>>>>>> ).
>>>>>>>
>>>>>>> "Input to Output" traces could be enough for some use-cases, but
>> we
>>>> are
>>>>>>> looking for a more detailed trace -that could cover cases like
>>>>>> side-effects
>>>>>>> (e.g. for each processor), where input/output and processors
>>>> latencies
>>>>>> can
>>>>>>> be recorded. This is why I have been looking for how to decorate
>>> the
>>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
>>> Here
>>>>> is
>>>>>> a
>>>>>>> gist of how we are planning to decorate the `addProcessor`
>> method:
>>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
>>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
>>>>>>>
>>>>>>> Hope this makes a bit more sense now :)
>>>>>>>
>>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
>>>>>>> matth...@confluent.io>)
>>>>>>> escribió:
>>>>>>>
>>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>
>>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
>>>> removing
>>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
>> an
>>>>>>>> internal class.
>>>>>>>>
>>>>>>>> However, the diff also shows
>>>>>>>>
>>>>>>>>> public Topology(final InternalTopologyBuilder
>>>>>> internalTopologyBuilder)
>>>>>>> {
>>>>>>>>
>>>>>>>> This has two impacts: first, it modifies `Topology` what is
>> part
>>> of
>>>>>>>> public API and would require a KIP. Second, it exposes
>>>>>>>> `InternalTopologyBuilder` as part of the public API --
>> something
>>> we
>>>>>>>> should not do.
>>>>>>>>
>>>>>>>> I am also not sure, why you want to do this (btw: also public
>> API
>>>>>> change
>>>>>>>> requiring a KIP). However, this should not be necessary.
>>>>>>>>
>>>>>>>>>     public StreamsBuilder(final Topology topology)  {
>>>>>>>>
>>>>>>>>
>>>>>>>> I think I am lacking some context what you try to achieve.
>> Maybe
>>>> you
>>>>>> can
>>>>>>>> elaborate in the problem you try to solve?
>>>>>>>>
>>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
>>>>>>>>> Hi everyone,
>>>>>>>>>
>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
>>>>>>>>>
>>>>>>>>> One option is to override and access
>>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
>>>> it is
>>>>>>>> final,
>>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> public class StreamsBuilder {
>>>>>>>>>
>>>>>>>>>     /** The actual topology that is constructed by this
>>>>>> StreamsBuilder.
>>>>>>>> */
>>>>>>>>>     private final Topology topology = new Topology();
>>>>>>>>>
>>>>>>>>>     /** The topology's internal builder. */
>>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
>>>>>>>>> topology.internalTopologyBuilder;
>>>>>>>>>
>>>>>>>>>     private final InternalStreamsBuilder
>>> internalStreamsBuilder =
>>>>> new
>>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
>> could
>>>>>>> decorate
>>>>>>>>> every `ProcessorSupplier` and capture traces from it:
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> @Override
>>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
>>>> supplier,
>>>>>>>>> String... predecessorNames) {
>>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
>>>> tracer,
>>>>>>> name,
>>>>>>>>> supplier), predecessorNames);
>>>>>>>>>   }
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Would it make sense to propose this as a change:
>>>>>>>>>
>>>>>>
>>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
>>>>>>> ?
>>>>>>>> or
>>>>>>>>> maybe there is a better way to do this?
>>>>>>>>> TopologyWrapper does something similar:
>>>>>>>>>
>>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
>>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
>>>>>>>>>
>>>>>>>>> Thanks in advance for any help.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Jorge.
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>> --
>>> -- Guozhang
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to