Thanks Mathhias for answering this. I lost your reply in my inbox, for
sorry for replying late.
I get what you are trying to say.

I was actually exploring opportunities where can we make Kafka Streams
faster since I am currently evaluating migrating a project from Spark
Streaming to Kafka Streams.

Currently, the spark streaming one excels in performance, so i was looking
some kind of best practices that we can employ.

I initially thought rocksDB and changelog topic writing might be making
things slower but while seeing iostats, the iowaits were very low. Also, as
per my understanding changelog writing might only be needed in groupby,
reduce, and may be map + map will execute one by one and would be fine even
if one processor machine is slow.

I am using mostly map, reduce(with windowing), filter and merge.

  KStream<String, AdLog> clickLogs = filterClickLogs(adLogs);

    // filter impression, notify and item notification logs.
    adLogs = filterImprRelevantLogs(adLogs);

    // filter visible = true;
    KStream<String, AdLog> validOldVisibleLogs =
filterOldVisibleLogs(adLogs);

    // Process itemVisible=true/false logs.
    KStream<String, AdLog> newImprLogs = filterNotVisibleLogs(adLogs);

    // notification matched logs.
    KStream<Windowed<String>, AdLog> validNewImprLogs =
        reduceImprNotify(stringSerde, adlogSerde, newImprLogs, 30, 1,
IMP_NOTIFY_STORE)
            .filter((k, v) -> v != null);

    // filter itemvisible=true
    KStream<Windowed<String>, AdLog> validItemVisibleTrueLogs =
        filterItemVisibleLogs(validNewImprLogs);

    // apply item notification logic, count valid impressions.
    KStream<Windowed<String>, AdLog> imprItemVisibleFalseLogs =
        filterItemVisibleFalseLogs(validNewImprLogs);

    KStream<Windowed<String>, AdLog> validImprItemLogs =
reduceImprItemNotify(stringSerde,
        adlogSerde, imprItemVisibleFalseLogs, 30, 1,
IMPR_ITEM_NOTIFY_STORE);

    KStream<String, Integer> liCount =
        extractLICount(builder.merge(validItemVisibleTrueLogs,
validImprItemLogs)).toStream();
    KStream<String, Integer> liCountOldImprLogs =
        extractLICount1(builder.merge(validOldVisibleLogs,
clickLogs)).toStream();



private KStream<Windowed<String>, AdLog> reduceImprNotify(Serde<String>
stringSerde,
      Serde<AdLog> adlogSerde, KStream<String, AdLog> clickImprLogs, int
windowTime,
      int advanceTime, String storeName) {

    KTable<Windowed<String>, AdLog> reducedImprs =
clickImprLogs.groupByKey(stringSerde, adlogSerde)
        .reduce((adLog1, adLog2) -> combImprNotify(adLog1, adLog2),
            TimeWindows.of(TimeUnit.MINUTES.toMillis(windowTime))
                .advanceBy(TimeUnit.MINUTES.toMillis(advanceTime))
                .until(TimeUnit.MINUTES.toMillis(windowTime * 4)),
            storeName);

    return reducedImprs
        // Those impressions which have been marked as matched should be
returned.
        .filter((k, v) -> {
          return (v.getAdLogType() == 1 && v.getAdImprLog().isVisible()) ||
v.getAdLogType() == 5;
        }).toStream();
  }



-Sameer.

On Thu, Jul 20, 2017 at 7:10 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Sameer,
>
> the optimization you describe applies to batch processing but not to
> stream processing.
>
> As you mentioned: "will traverse the data only once".
>
> This property is interesting in batch processing only, as it means that
> the data is only read from disk once and both map operations are applies
> during this read. Ie, a "dump" execution would do:
>
> 1) read -> map -> write
> 2) read -> map -> write
> 3) read -> reduce -> write
>
> but Spark does it like this:
>
> read -> map -> map -> reduce -> write
>
> (this is called pipeline parallelism, or operator chaining/fusion).
>
> In stream processing, you will do the "optimized" version anyway, and
> thus, because the optimized version is the native way to execute a
> streaming program, there is no need to optimize :)
>
> Note, that data is not read/written from disk in intermediate steps in
> stream processing. The deployed program is a continuous query and the
> operator are deployed and "online" all the time, while data is streamed
> through them. It's a completely different runtime model.
>
> Thus, with regard to Spark vs Kafka Streams and your example, Kafka
> Streams will execute two consecutive maps quite similar to Spark. I say
> "quite similar" only because Kafka Streams is a true stream processing
> while Spark Streaming does micro batching (ie, it emulates streaming by
> doing batch processing).
>
> Hope this answers your question.
>
>
> -Matthias
>
>
> On 7/18/17 7:35 AM, Sameer Kumar wrote:
> > Hi Guozhang,
> >
> > I was comparing it with DAG processing in Spark.  Spark Streaming is a
> > close competitor to Kafka Streams, one difference which might accounts
> for
> > a faster performance was that Spark submits the code to the code and
> does a
> > bit of code optimization that its end.
> >
> > Lets consider an example code which has map-> map-> reduce. The map
> > functions will not be executed unless reduce executes since its a
> terminal
> > operation but whenever execution happens spark will traverse data only
> once
> > and may call map functions one after the another. This is same as in
> Java 8
> > concept of streams.
> >
> > Please refer to the following link, that explains it very well
> > https://stackoverflow.com/questions/25836316/how-dag-
> works-under-the-covers-in-rdd/30685279#30685279
> >
> > In Kafka Streams, we do specify the topology here but i dont think we do
> > some sort of code optimization. My earlier example will traverse the data
> > twice once for each map phase.
> >
> > Please excuse with the late response, I am operating out of different
> > geography.
> >
> > -Sameer.
> >
> >
> >
> > On Tue, Jul 18, 2017 at 2:44 AM, Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> >> If that is what it meant for DAG processing (we still need to confirm
> with
> >> Sameer), then programming-wise I do not see what's the difference with
> >> Kafka Streams since inside Streams users is also just specifying the
> >> topology as a DAG:
> >>
> >> https://kafka.apache.org/0110/documentation/streams/core-
> >> concepts#streams_topology
> >>
> >> What is even better is that for Streams since we use Kafka as
> intermeidate
> >> buffer between connected sub-topologies user do not need to worry about
> >> back-pressure at all:
> >>
> >> http://docs.confluent.io/current/streams/architecture.html#backpressure
> >>
> >>
> >> And flexibility-wise, as you mention "it is a bit more flexible than
> Kafka
> >> Streams", I also cannot agree with you that it is the case, since with
> >> Kafka Streams threading model people can easily have multiple tasks
> >> representing different connected parts (i.e. sub-topologies) of the DAG
> >> which are then hosted by different threads executing on their own pace
> >> concurrently:
> >>
> >> https://kafka.apache.org/0110/documentation/streams/
> architecture#streams_
> >> architecture_threads
> >>
> >> Again, this is because different threads never need to talk to each
> other,
> >> but they just read / write data from / to Kafka topics which are then
> the
> >> persistent buffer of the intermeidate streams, no synchronization
> between
> >> threads are needed.
> >>
> >>
> >> Guozhang
> >>
> >> On Mon, Jul 17, 2017 at 10:38 AM, David Garcia <dav...@spiceworks.com>
> >> wrote:
> >>
> >>> On that note, akka streams has Kafka integration.  We use it heavily
> and
> >>> it is quite a bit more flexible than K-Streams (which we also useā€¦but
> for
> >>> simpler applications)  Akka-streams-Kafka is particularly good for
> >>> asynchronous processing: http://doc.akka.io/docs/akka-
> >>> stream-kafka/current/home.html
> >>>
> >>> -David
> >>>
> >>> On 7/17/17, 12:35 PM, "David Garcia" <dav...@spiceworks.com> wrote:
> >>>
> >>>     I think he means something like Akka Streams:
> >>> http://doc.akka.io/docs/akka/2.5.2/java/stream/stream-graphs.html
> >>>
> >>>     Directed Acyclic Graphs are trivial to construct in Akka Streams
> and
> >>> use back-pressure to preclude memory issues.
> >>>
> >>>     -David
> >>>
> >>>     On 7/17/17, 12:20 PM, "Guozhang Wang" <wangg...@gmail.com> wrote:
> >>>
> >>>         Sameer,
> >>>
> >>>         Could you elaborate a bit more what do you mean by "DAG
> >>> processing"?
> >>>
> >>>
> >>>         Guozhang
> >>>
> >>>
> >>>         On Sun, Jul 16, 2017 at 11:58 PM, Sameer Kumar <
> >>> sam.kum.w...@gmail.com>
> >>>         wrote:
> >>>
> >>>         > Currently, we don't have DAG processing in Kafka Streams.
> >> Having
> >>> a DAG has
> >>>         > its own share of advantages in that, it can optimize code on
> >> its
> >>> own and
> >>>         > come up with a optimized execution plan.
> >>>         >
> >>>         > Are we exploring in this direction, do we have this in our
> >>> current roadmap.
> >>>         >
> >>>         >  -Sameer.
> >>>         >
> >>>
> >>>
> >>>
> >>>         --
> >>>         -- Guozhang
> >>>
> >>>
> >>>
> >>>
> >>>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >
>
>

Reply via email to