Hi Mahesh, why do you need to iterate over the elements?
I wonder if you can't just stream the data from kafka1-kafka4 through your topology? On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR <r.mahesh.kumar....@gmail.com> wrote: > Hi Team, > > I am trying to build an audit like system where I read messages from "n" > Kafka queues, key by a unique key and then reduce them to a single message, > if it has passed through all the "n" Kafka queues in a window time of "m" > hours/days, the message has succeeded else it has expired. > > I can get it working in my test case but can't get it working when there > are million of messages, there are very few messages that goes to the > success stage in the iteration, huge amount of messages are sent back to > the iteration, hence it create back pressure and it does not read the > messages from Kafka queues anymore. Since no new messages are read, the > messages inside the window no longer succeed, they keep going through the > iterator forever and expire although they must succeed. > > I read about the buffer which when full creates back pressure and does not > read any more messages. The system is suppose to be a light weight audit > system and audit messages created are very small in size. Is it possible to > increase the size of the buffer to avoid back pressure? Is there an > alternative solution to this issue? > > The code looks like this: > > val unionInputStream = union(kafka1,kafka2,kafka3,kafka4) > > def audit() = { > reducedStream = unionInputStream.keyby(keyFunction).window( > TumblingProcessingTimeWindow).reduce(reduceFunction) > splitStreams = reducedStream.split(splitFunction) > splitStreams.select(success).addSink(terminalSink) > splitStreams.select(expire).addSink(expireSink) > (splitStreams.select(replay), splitStreams.select(success)) > } > > unionInputStream.iterate(audit(_)) > > > > Thanks and Regards, > Mahesh >