Ian,

You can try with the `toString` function (assuming you're on the older
version) of KafkaStreams to print the constructed topology and check if
multiple repartition topics is created.

>From your code snippet, it is a bit hard to tell, since I do not know
if repeatedInputStream
is already from a topic partitioned by its key: if yes, then it should not
require a repartition again.


Guozhang


On Thu, Aug 31, 2017 at 2:37 AM, Ian Duffy <i...@ianduffy.ie> wrote:

> Hi Guozhang,
>
> Looking at this again this again I'm a little confused, I'm not using any
> maps and as far as I know, the selectKey is already causing a repartition:
> "base-topic-KSTREAM-KEY-SELECT-0000000003-repartition".
>
> Is using the same KTable in multiple different leftJoins going to be an
> issue?
>
>
>   private val contentInputStreamWithKeyDomain: KStream[String, Content] =
> contentInputStream.selectKey((_: String, value: Content) => value.domain)
>   private val contentCountsByDomain: KTable[String, Long] =
> contentInputStreamWithKeyDomain.groupByKey().count()
>
>   private val contentInputStreamWithCounts: KStream[String, ContentCount] =
> contentInputStreamWithKeyDomain.leftJoin(
>     contentCountsByDomain, (content: Content, count: Long) => {
>     info(s"Joining input ${content.url} with domain content count $count")
>     ContentCount(content, if (count == null) new Long(0) else count)
>   })
>
>   private val repeatedInputStreamWithCounts: KStream[String, ContentCount]
> = repeatedInputStream.leftJoin(
>     contentCountsByDomain, (content: Content, count: Long) => {
>     info(s"Joining repeated input ${content.url} with domain content count
> $count")
>     ContentCount(content, if (count == null) new Long(0) else count)
>   })
>
>
> On 29 August 2017 at 17:18, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Ian, if your issue is indeed due to KAFKA-4601, currently the best way
> > would be what I mentioned in that ticket, i.e. manually call `through` to
> > do the repartition, and then from the repartition topic do the
> aggregation
> > first followed by the join. It will enforce that for each incoming
> record,
> > it will always go through the aggregation processor first, then the join
> > processor. The downside is that this repartition topic is then not
> internal
> > but needs to be managed by you, as the user.
> >
> > We are working on fixing KAFKA-4601 soon, but this may involve a rather
> > general fix, to refactor the DSL translation to go beyond
> > operator-by-operator steps.
> >
> >
> > Guozhang
> >
> > On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <i...@ianduffy.ie> wrote:
> >
> > > Thanks for the information Guozhang.
> > >
> > > Any recommendations for handling or working around this? It's making
> > tests
> > > very flakey.
> > >
> > > On 24 August 2017 at 23:48, Guozhang Wang <wangg...@gmail.com> wrote:
> > >
> > > > Hi Ian,
> > > >
> > > > I suspect it has something to do with your specified topology, in
> which
> > > it
> > > > triggers the join first, then the aggregation updates.
> > > >
> > > > For example, take a look at this ticket:
> > > > https://issues.apache.org/jira/browse/KAFKA-4601
> > > >
> > > > As from its printed topology, due to the repartition topic the join
> > > > operator may be triggered before the aggregation operator, and hence
> it
> > > > would cause the Counts table to return `null` indicating it has not
> > > > received a record yet.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <i...@ianduffy.ie> wrote:
> > > >
> > > > > Hi All,
> > > > >
> > > > > I'm building a streams applications where I wish to take action on
> > the
> > > > > input when a certain frequency of the input has been seen.
> > > > >
> > > > > At the moment the application roughly goes:
> > > > >
> > > > > frequency table = Input Stream -> groupByKey -> Count
> > > > >
> > > > > input stream with counts = leftJoin frequency table and input
> stream
> > > > >
> > > > > branch input stream with counts where count > some threshold send
> to
> > > some
> > > > > other topic, if <= some threshold send to delay processing topic.
> > > > >
> > > > > delay stream with counts = leftJoin frequency table and delay
> stream
> > > > >
> > > > > branch delay stream with counts where count > some threshold send
> to
> > > some
> > > > > other topic, if <= some threshold send to delay processing topic.
> > > > >
> > > > > I've seen intermittent failures where the frequency table is
> giving a
> > > > null
> > > > > on join, no matter how long I wait content coming back in on the
> > > delayed
> > > > > stream keeps getting nulls for the count when looking up the
> > frequency
> > > > > table.
> > > > >
> > > > > Any ideas why this might be occurring?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Ian.
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to