Thanks for your answer, Matthias!

What I'm looking for is something similar to interceptors, but for Stream
Processors.

In Zipkin -and probably other tracing implementations as well- we are using
Headers to propagate the context of a trace (i.e. adding metadata to the
Kafka Record, so we can create references to a trace).
Now that Headers are part of Kafka Streams Processor API, we can propagate
context from input (Consumers) to outputs (Producers) by using
`KafkaClientSupplier` (e.g. <
https://github.com/openzipkin/brave/blob/master/instrumentation/kafka-streams/src/main/java/brave/kafka/streams/TracingKafkaClientSupplier.java
>).

"Input to Output" traces could be enough for some use-cases, but we are
looking for a more detailed trace -that could cover cases like side-effects
(e.g. for each processor), where input/output and processors latencies can
be recorded. This is why I have been looking for how to decorate the
`ProcessorSupplier` and all the changes shown in the comparison. Here is a
gist of how we are planning to decorate the `addProcessor` method:
https://github.com/openzipkin/brave/compare/master...jeqo:kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7

Hope this makes a bit more sense now :)

El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<matth...@confluent.io>)
escribió:

> >> I'm experimenting on how to add tracing to Kafka Streams.
>
> What do you mean by this exactly? Is there a JIRA? I am fine removing
> `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> internal class.
>
> However, the diff also shows
>
> > public Topology(final InternalTopologyBuilder internalTopologyBuilder) {
>
> This has two impacts: first, it modifies `Topology` what is part of
> public API and would require a KIP. Second, it exposes
> `InternalTopologyBuilder` as part of the public API -- something we
> should not do.
>
> I am also not sure, why you want to do this (btw: also public API change
> requiring a KIP). However, this should not be necessary.
>
> >     public StreamsBuilder(final Topology topology)  {
>
>
> I think I am lacking some context what you try to achieve. Maybe you can
> elaborate in the problem you try to solve?
>
>
> -Matthias
>
> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > Hi everyone,
> >
> > I'm experimenting on how to add tracing to Kafka Streams.
> >
> > One option is to override and access
> > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> final,
> > and builder is not exposed as part of `StreamsBuilder`:
> >
> > ```
> > public class StreamsBuilder {
> >
> >     /** The actual topology that is constructed by this StreamsBuilder.
> */
> >     private final Topology topology = new Topology();
> >
> >     /** The topology's internal builder. */
> >     final InternalTopologyBuilder internalTopologyBuilder =
> > topology.internalTopologyBuilder;
> >
> >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > InternalStreamsBuilder(internalTopologyBuilder);
> > ```
> >
> > The goal is that If `builder#addProcessor` is exposed, we could decorate
> > every `ProcessorSupplier` and capture traces from it:
> >
> > ```
> > @Override
> >   public void addProcessor(String name, ProcessorSupplier supplier,
> > String... predecessorNames) {
> >     super.addProcessor(name, new TracingProcessorSupplier(tracer, name,
> > supplier), predecessorNames);
> >   }
> > ```
> >
> > Would it make sense to propose this as a change:
> > https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology ?
> or
> > maybe there is a better way to do this?
> > TopologyWrapper does something similar:
> >
> https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/TopologyWrapper.java
> >
> > Thanks in advance for any help.
> >
> > Cheers,
> > Jorge.
> >
>
>

Reply via email to