Ian, if your issue is indeed due to KAFKA-4601, currently the best way
would be what I mentioned in that ticket, i.e. manually call `through` to
do the repartition, and then from the repartition topic do the aggregation
first followed by the join. It will enforce that for each incoming record,
it will always go through the aggregation processor first, then the join
processor. The downside is that this repartition topic is then not internal
but needs to be managed by you, as the user.

We are working on fixing KAFKA-4601 soon, but this may involve a rather
general fix, to refactor the DSL translation to go beyond
operator-by-operator steps.


Guozhang

On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <i...@ianduffy.ie> wrote:

> Thanks for the information Guozhang.
>
> Any recommendations for handling or working around this? It's making tests
> very flakey.
>
> On 24 August 2017 at 23:48, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Ian,
> >
> > I suspect it has something to do with your specified topology, in which
> it
> > triggers the join first, then the aggregation updates.
> >
> > For example, take a look at this ticket:
> > https://issues.apache.org/jira/browse/KAFKA-4601
> >
> > As from its printed topology, due to the repartition topic the join
> > operator may be triggered before the aggregation operator, and hence it
> > would cause the Counts table to return `null` indicating it has not
> > received a record yet.
> >
> >
> > Guozhang
> >
> >
> > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <i...@ianduffy.ie> wrote:
> >
> > > Hi All,
> > >
> > > I'm building a streams applications where I wish to take action on the
> > > input when a certain frequency of the input has been seen.
> > >
> > > At the moment the application roughly goes:
> > >
> > > frequency table = Input Stream -> groupByKey -> Count
> > >
> > > input stream with counts = leftJoin frequency table and input stream
> > >
> > > branch input stream with counts where count > some threshold send to
> some
> > > other topic, if <= some threshold send to delay processing topic.
> > >
> > > delay stream with counts = leftJoin frequency table and delay stream
> > >
> > > branch delay stream with counts where count > some threshold send to
> some
> > > other topic, if <= some threshold send to delay processing topic.
> > >
> > > I've seen intermittent failures where the frequency table is giving a
> > null
> > > on join, no matter how long I wait content coming back in on the
> delayed
> > > stream keeps getting nulls for the count when looking up the frequency
> > > table.
> > >
> > > Any ideas why this might be occurring?
> > >
> > > Thanks,
> > >
> > > Ian.
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Reply via email to