Hi Guozhang, Looking at this again this again I'm a little confused, I'm not using any maps and as far as I know, the selectKey is already causing a repartition: "base-topic-KSTREAM-KEY-SELECT-0000000003-repartition".
Is using the same KTable in multiple different leftJoins going to be an issue? private val contentInputStreamWithKeyDomain: KStream[String, Content] = contentInputStream.selectKey((_: String, value: Content) => value.domain) private val contentCountsByDomain: KTable[String, Long] = contentInputStreamWithKeyDomain.groupByKey().count() private val contentInputStreamWithCounts: KStream[String, ContentCount] = contentInputStreamWithKeyDomain.leftJoin( contentCountsByDomain, (content: Content, count: Long) => { info(s"Joining input ${content.url} with domain content count $count") ContentCount(content, if (count == null) new Long(0) else count) }) private val repeatedInputStreamWithCounts: KStream[String, ContentCount] = repeatedInputStream.leftJoin( contentCountsByDomain, (content: Content, count: Long) => { info(s"Joining repeated input ${content.url} with domain content count $count") ContentCount(content, if (count == null) new Long(0) else count) }) On 29 August 2017 at 17:18, Guozhang Wang <wangg...@gmail.com> wrote: > Ian, if your issue is indeed due to KAFKA-4601, currently the best way > would be what I mentioned in that ticket, i.e. manually call `through` to > do the repartition, and then from the repartition topic do the aggregation > first followed by the join. It will enforce that for each incoming record, > it will always go through the aggregation processor first, then the join > processor. The downside is that this repartition topic is then not internal > but needs to be managed by you, as the user. > > We are working on fixing KAFKA-4601 soon, but this may involve a rather > general fix, to refactor the DSL translation to go beyond > operator-by-operator steps. > > > Guozhang > > On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <i...@ianduffy.ie> wrote: > > > Thanks for the information Guozhang. > > > > Any recommendations for handling or working around this? It's making > tests > > very flakey. > > > > On 24 August 2017 at 23:48, Guozhang Wang <wangg...@gmail.com> wrote: > > > > > Hi Ian, > > > > > > I suspect it has something to do with your specified topology, in which > > it > > > triggers the join first, then the aggregation updates. > > > > > > For example, take a look at this ticket: > > > https://issues.apache.org/jira/browse/KAFKA-4601 > > > > > > As from its printed topology, due to the repartition topic the join > > > operator may be triggered before the aggregation operator, and hence it > > > would cause the Counts table to return `null` indicating it has not > > > received a record yet. > > > > > > > > > Guozhang > > > > > > > > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <i...@ianduffy.ie> wrote: > > > > > > > Hi All, > > > > > > > > I'm building a streams applications where I wish to take action on > the > > > > input when a certain frequency of the input has been seen. > > > > > > > > At the moment the application roughly goes: > > > > > > > > frequency table = Input Stream -> groupByKey -> Count > > > > > > > > input stream with counts = leftJoin frequency table and input stream > > > > > > > > branch input stream with counts where count > some threshold send to > > some > > > > other topic, if <= some threshold send to delay processing topic. > > > > > > > > delay stream with counts = leftJoin frequency table and delay stream > > > > > > > > branch delay stream with counts where count > some threshold send to > > some > > > > other topic, if <= some threshold send to delay processing topic. > > > > > > > > I've seen intermittent failures where the frequency table is giving a > > > null > > > > on join, no matter how long I wait content coming back in on the > > delayed > > > > stream keeps getting nulls for the count when looking up the > frequency > > > > table. > > > > > > > > Any ideas why this might be occurring? > > > > > > > > Thanks, > > > > > > > > Ian. > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > -- > -- Guozhang >