Hi Guozhang,

Looking at this again this again I'm a little confused, I'm not using any
maps and as far as I know, the selectKey is already causing a repartition:
"base-topic-KSTREAM-KEY-SELECT-0000000003-repartition".

Is using the same KTable in multiple different leftJoins going to be an
issue?


  private val contentInputStreamWithKeyDomain: KStream[String, Content] =
contentInputStream.selectKey((_: String, value: Content) => value.domain)
  private val contentCountsByDomain: KTable[String, Long] =
contentInputStreamWithKeyDomain.groupByKey().count()

  private val contentInputStreamWithCounts: KStream[String, ContentCount] =
contentInputStreamWithKeyDomain.leftJoin(
    contentCountsByDomain, (content: Content, count: Long) => {
    info(s"Joining input ${content.url} with domain content count $count")
    ContentCount(content, if (count == null) new Long(0) else count)
  })

  private val repeatedInputStreamWithCounts: KStream[String, ContentCount]
= repeatedInputStream.leftJoin(
    contentCountsByDomain, (content: Content, count: Long) => {
    info(s"Joining repeated input ${content.url} with domain content count
$count")
    ContentCount(content, if (count == null) new Long(0) else count)
  })


On 29 August 2017 at 17:18, Guozhang Wang <wangg...@gmail.com> wrote:

> Ian, if your issue is indeed due to KAFKA-4601, currently the best way
> would be what I mentioned in that ticket, i.e. manually call `through` to
> do the repartition, and then from the repartition topic do the aggregation
> first followed by the join. It will enforce that for each incoming record,
> it will always go through the aggregation processor first, then the join
> processor. The downside is that this repartition topic is then not internal
> but needs to be managed by you, as the user.
>
> We are working on fixing KAFKA-4601 soon, but this may involve a rather
> general fix, to refactor the DSL translation to go beyond
> operator-by-operator steps.
>
>
> Guozhang
>
> On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <i...@ianduffy.ie> wrote:
>
> > Thanks for the information Guozhang.
> >
> > Any recommendations for handling or working around this? It's making
> tests
> > very flakey.
> >
> > On 24 August 2017 at 23:48, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > > Hi Ian,
> > >
> > > I suspect it has something to do with your specified topology, in which
> > it
> > > triggers the join first, then the aggregation updates.
> > >
> > > For example, take a look at this ticket:
> > > https://issues.apache.org/jira/browse/KAFKA-4601
> > >
> > > As from its printed topology, due to the repartition topic the join
> > > operator may be triggered before the aggregation operator, and hence it
> > > would cause the Counts table to return `null` indicating it has not
> > > received a record yet.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <i...@ianduffy.ie> wrote:
> > >
> > > > Hi All,
> > > >
> > > > I'm building a streams applications where I wish to take action on
> the
> > > > input when a certain frequency of the input has been seen.
> > > >
> > > > At the moment the application roughly goes:
> > > >
> > > > frequency table = Input Stream -> groupByKey -> Count
> > > >
> > > > input stream with counts = leftJoin frequency table and input stream
> > > >
> > > > branch input stream with counts where count > some threshold send to
> > some
> > > > other topic, if <= some threshold send to delay processing topic.
> > > >
> > > > delay stream with counts = leftJoin frequency table and delay stream
> > > >
> > > > branch delay stream with counts where count > some threshold send to
> > some
> > > > other topic, if <= some threshold send to delay processing topic.
> > > >
> > > > I've seen intermittent failures where the frequency table is giving a
> > > null
> > > > on join, no matter how long I wait content coming back in on the
> > delayed
> > > > stream keeps getting nulls for the count when looking up the
> frequency
> > > > table.
> > > >
> > > > Any ideas why this might be occurring?
> > > >
> > > > Thanks,
> > > >
> > > > Ian.
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to