I like this idea. But to get clean and concise PRs, I would prefer to have a JIRA and extra PR for this.
WDYT? -Matthias On 2/8/17 9:35 AM, Guozhang Wang wrote: > The KIP proposal LGTM, thanks Steven! > > One meta comment on the PR itself: I'm wondering if we could refactoring > the implementation of `KStream.print() / writeAsText()` to just be a > special impl of `peek()` then, like we did for `count` as for `aggregate`? > I.e. we can replace the `KeyValuePrinter` class with an internal ForEach > impl within `peek()`. > > > Guozhang > > > On Tue, Feb 7, 2017 at 11:09 AM, Gwen Shapira <g...@confluent.io> wrote: > >> Far better! Thank you! >> >> On Tue, Feb 7, 2017 at 10:19 AM, Steven Schlansker >> <sschlans...@opentable.com> wrote: >>> Thanks for the feedback. I improved the javadoc a bit, do you like it >> better? >>> >>> /** >>> * Perform an action on each record of {@code KStream}. >>> * This is a stateless record-by-record operation (cf. {@link >> #process(ProcessorSupplier, String...)}). >>> * >>> * Peek is a non-terminal operation that triggers a side effect >> (such as logging or statistics collection) >>> * and returns an unchanged stream. >>> * >>> * Note that since this operation is stateless, it may execute >> multiple times for a single record in failure cases. >>> * >>> * @param action an action to perform on each record >>> * @see #process(ProcessorSupplier, String...) >>> */ >>> KStream<K, V> peek(final ForeachAction<? super K, ? super V> action); >>> >>> Updated in-place on the PR. >>> >>>> On Feb 7, 2017, at 2:19 AM, Michael Noll <mich...@confluent.io> wrote: >>>> >>>> Many thanks for the KIP and the PR, Steven! >>>> >>>> My opinion, too, is that we should consider including this. >>>> >>>> One thing that I would like to see clarified is the difference between >> the >>>> proposed peek() and existing functions map() and foreach(), for >> instance. >>>> My understanding (see also the Java 8 links below) is that: >>>> >>>> - Like `map`, `peek` will return a KStream. This also means that, >> unlike >>>> `foreach`, `peek` is not a terminal operation. >>>> - The main purpose of `peek` is, similar to `foreach`, the *side >> effects* >>>> (such as the metrics counter example in the KIP) -- and, on a related >> note, >>>> also to express your *intent* to achieve such side effects in the first >>>> place (which is similar to when to use `foreach` rather than `map`); and >>>> typically you should not (must not?) modify the underlying stream itself >>>> (unlike `map`, which is supposed to do exactly that). >>>> >>>> For reference, here are the descriptions of peek, map, foreach in Java >> 8. >>>> I could have also included links to StackOverflow questions where people >>>> were confused about when (not) to use peek. ;-) >>>> >>>> https://docs.oracle.com/javase/8/docs/api/java/util/ >> stream/Stream.html#peek-java.util.function.Consumer- >>>> https://docs.oracle.com/javase/8/docs/api/java/util/ >> stream/Stream.html#map-java.util.function.Function- >>>> https://docs.oracle.com/javase/8/docs/api/java/util/ >> stream/Stream.html#forEach-java.util.function.Consumer- >>>> >>>> Best wishes, >>>> Michael >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Tue, Feb 7, 2017 at 10:37 AM, Damian Guy <damian....@gmail.com> >> wrote: >>>> >>>>> Hi Steven, >>>>> Thanks for the KIP. I think this is a worthy addition to the API. >>>>> >>>>> Thanks, >>>>> Damian >>>>> >>>>> On Tue, 7 Feb 2017 at 09:30 Eno Thereska <eno.there...@gmail.com> >> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I like the proposal, thank you. I have found it frustrating myself >> not to >>>>>> be able to understand simple things, like how many records have been >>>>>> currently processed. The peek method would allow those kinds of >>>>> diagnostics >>>>>> and debugging. >>>>>> >>>>>> Gwen, it is possible to do this with the existing functionality like >> map, >>>>>> but you'd have to fake the map method. Also, it is not great using map >>>>> for >>>>>> things it was not intended for. Having an explicit peek makes it >> clearer >>>>> in >>>>>> my opinion. >>>>>> >>>>>> Thanks >>>>>> Eno >>>>>> >>>>>>> On 7 Feb 2017, at 03:20, Gwen Shapira <g...@confluent.io> wrote: >>>>>>> >>>>>>> I've read the wiki and am unclear about the proposal. Can you provide >>>>>>> something like a Javadoc for peek()? What would this method do? >>>>>>> >>>>>>> Also, forgive me if I'm missing an important point here, but can't I >>>>>>> put the println statement in a map()? >>>>>>> >>>>>>> On Mon, Feb 6, 2017 at 5:48 PM, Matthias J. Sax < >> matth...@confluent.io >>>>>> >>>>>> wrote: >>>>>>>> Steven, >>>>>>>> >>>>>>>> Thanks for your KIP. I move this discussion to dev mailing list -- >>>>> KIPs >>>>>>>> need to be discussed there (and can be cc'ed to user list). >>>>>>>> >>>>>>>> Can you also add the KIP to the table "KIPs under discussion": >>>>>>>> >>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/ >>>>> Kafka+Improvement+Proposals#KafkaImprovementProposals- >> KIPsunderdiscussion >>>>>>>> >>>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> >>>>>>>> -Matthias >>>>>>>> >>>>>>>> On 2/6/17 3:35 PM, Steven Schlansker wrote: >>>>>>>>> Hello users@kafka, >>>>>>>>> >>>>>>>>> I would like to propose a small KIP on the Streams framework >>>>>>>>> that simply adds a KStream#peek implementation. >>>>>>>>> >>>>>>>>> >>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>>> 121%3A+Add+KStream+peek+method >>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-4720 >>>>>>>>> https://github.com/apache/kafka/pull/2493 >>>>>>>>> >>>>>>>>> Please consider my contribution and hopefully you all like it and >>>>>> agree that it should be merged into 0.10.3 :) >>>>>>>>> If not, be gentle, this is my first KIP! >>>>>>>>> >>>>>>>>> Happy Monday, >>>>>>>>> Steven >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Gwen Shapira >>>>>>> Product Manager | Confluent >>>>>>> 650.450.2760 <(650)%20450-2760> | @gwenshap >>>>>>> Follow us: Twitter | blog >>>>>> >>>>>> >>>>> >>> >> >> >> >> -- >> Gwen Shapira >> Product Manager | Confluent >> 650.450.2760 | @gwenshap >> Follow us: Twitter | blog >> > > >
signature.asc
Description: OpenPGP digital signature