Ian, You can try with the `toString` function (assuming you're on the older version) of KafkaStreams to print the constructed topology and check if multiple repartition topics is created.
>From your code snippet, it is a bit hard to tell, since I do not know if repeatedInputStream is already from a topic partitioned by its key: if yes, then it should not require a repartition again. Guozhang On Thu, Aug 31, 2017 at 2:37 AM, Ian Duffy <i...@ianduffy.ie> wrote: > Hi Guozhang, > > Looking at this again this again I'm a little confused, I'm not using any > maps and as far as I know, the selectKey is already causing a repartition: > "base-topic-KSTREAM-KEY-SELECT-0000000003-repartition". > > Is using the same KTable in multiple different leftJoins going to be an > issue? > > > private val contentInputStreamWithKeyDomain: KStream[String, Content] = > contentInputStream.selectKey((_: String, value: Content) => value.domain) > private val contentCountsByDomain: KTable[String, Long] = > contentInputStreamWithKeyDomain.groupByKey().count() > > private val contentInputStreamWithCounts: KStream[String, ContentCount] = > contentInputStreamWithKeyDomain.leftJoin( > contentCountsByDomain, (content: Content, count: Long) => { > info(s"Joining input ${content.url} with domain content count $count") > ContentCount(content, if (count == null) new Long(0) else count) > }) > > private val repeatedInputStreamWithCounts: KStream[String, ContentCount] > = repeatedInputStream.leftJoin( > contentCountsByDomain, (content: Content, count: Long) => { > info(s"Joining repeated input ${content.url} with domain content count > $count") > ContentCount(content, if (count == null) new Long(0) else count) > }) > > > On 29 August 2017 at 17:18, Guozhang Wang <wangg...@gmail.com> wrote: > > > Ian, if your issue is indeed due to KAFKA-4601, currently the best way > > would be what I mentioned in that ticket, i.e. manually call `through` to > > do the repartition, and then from the repartition topic do the > aggregation > > first followed by the join. It will enforce that for each incoming > record, > > it will always go through the aggregation processor first, then the join > > processor. The downside is that this repartition topic is then not > internal > > but needs to be managed by you, as the user. > > > > We are working on fixing KAFKA-4601 soon, but this may involve a rather > > general fix, to refactor the DSL translation to go beyond > > operator-by-operator steps. > > > > > > Guozhang > > > > On Tue, Aug 29, 2017 at 3:59 AM, Ian Duffy <i...@ianduffy.ie> wrote: > > > > > Thanks for the information Guozhang. > > > > > > Any recommendations for handling or working around this? It's making > > tests > > > very flakey. > > > > > > On 24 August 2017 at 23:48, Guozhang Wang <wangg...@gmail.com> wrote: > > > > > > > Hi Ian, > > > > > > > > I suspect it has something to do with your specified topology, in > which > > > it > > > > triggers the join first, then the aggregation updates. > > > > > > > > For example, take a look at this ticket: > > > > https://issues.apache.org/jira/browse/KAFKA-4601 > > > > > > > > As from its printed topology, due to the repartition topic the join > > > > operator may be triggered before the aggregation operator, and hence > it > > > > would cause the Counts table to return `null` indicating it has not > > > > received a record yet. > > > > > > > > > > > > Guozhang > > > > > > > > > > > > On Thu, Aug 24, 2017 at 3:17 AM, Ian Duffy <i...@ianduffy.ie> wrote: > > > > > > > > > Hi All, > > > > > > > > > > I'm building a streams applications where I wish to take action on > > the > > > > > input when a certain frequency of the input has been seen. > > > > > > > > > > At the moment the application roughly goes: > > > > > > > > > > frequency table = Input Stream -> groupByKey -> Count > > > > > > > > > > input stream with counts = leftJoin frequency table and input > stream > > > > > > > > > > branch input stream with counts where count > some threshold send > to > > > some > > > > > other topic, if <= some threshold send to delay processing topic. > > > > > > > > > > delay stream with counts = leftJoin frequency table and delay > stream > > > > > > > > > > branch delay stream with counts where count > some threshold send > to > > > some > > > > > other topic, if <= some threshold send to delay processing topic. > > > > > > > > > > I've seen intermittent failures where the frequency table is > giving a > > > > null > > > > > on join, no matter how long I wait content coming back in on the > > > delayed > > > > > stream keeps getting nulls for the count when looking up the > > frequency > > > > > table. > > > > > > > > > > Any ideas why this might be occurring? > > > > > > > > > > Thanks, > > > > > > > > > > Ian. > > > > > > > > > > > > > > > > > > > > > -- > > > > -- Guozhang > > > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang