Hi Robert/Team, Is there any recommended solution or any other insight on how I must be doing it?
Thanks and Regards, Mahesh On Thu, Jun 1, 2017 at 10:32 AM, MAHESH KUMAR <r.mahesh.kumar....@gmail.com> wrote: > Hi Robert, > > The Message Auditor System must monitor all the 4 kafka queue and gather > information about messages that made through all of them or say > specifically which queue a particular message did not make it through. > We want the window time to be equivalent to our SLA time so that any > message that does not make through all the 4 stages would be deemed as > failed(expired). If we make the window time equal to our SLA time then the > buffers may become full at a faster pace since only at the end of the > window, the messages will be categorized to successful/failed. Having > iteration helps us to maintain a smaller window where if a message has > passed through all the stages within a very short interval(very small > compared to SLA) we can categorize it as successful and continue for the > messages that has not made it to the final stage(failed/expired). This is > the reason we use Iteration. > > We could probably avoid Iteration and create a larger time window with SLA > time. The system may/will still face the same issue, the back pressure > won't allow new messages to go through and the messages inside the window > may expire although they actually have passed through the 4 stages. Is > there any recommended way to go about it? > > Thanks and Regards, > Mahesh > > > > > > On Thu, Jun 1, 2017 at 9:49 AM, Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Mahesh, >> >> why do you need to iterate over the elements? >> >> I wonder if you can't just stream the data from kafka1-kafka4 through >> your topology? >> >> >> >> On Fri, May 26, 2017 at 7:21 PM, MAHESH KUMAR < >> r.mahesh.kumar....@gmail.com> wrote: >> >>> Hi Team, >>> >>> I am trying to build an audit like system where I read messages from "n" >>> Kafka queues, key by a unique key and then reduce them to a single message, >>> if it has passed through all the "n" Kafka queues in a window time of "m" >>> hours/days, the message has succeeded else it has expired. >>> >>> I can get it working in my test case but can't get it working when there >>> are million of messages, there are very few messages that goes to the >>> success stage in the iteration, huge amount of messages are sent back to >>> the iteration, hence it create back pressure and it does not read the >>> messages from Kafka queues anymore. Since no new messages are read, the >>> messages inside the window no longer succeed, they keep going through the >>> iterator forever and expire although they must succeed. >>> >>> I read about the buffer which when full creates back pressure and does >>> not read any more messages. The system is suppose to be a light weight >>> audit system and audit messages created are very small in size. Is it >>> possible to increase the size of the buffer to avoid back pressure? Is >>> there an alternative solution to this issue? >>> >>> The code looks like this: >>> >>> val unionInputStream = union(kafka1,kafka2,kafka3,kafka4) >>> >>> def audit() = { >>> reducedStream = unionInputStream.keyby(keyFunc >>> tion).window(TumblingProcessingTimeWindow).reduce(reduceFunction) >>> splitStreams = reducedStream.split(splitFunction) >>> splitStreams.select(success).addSink(terminalSink) >>> splitStreams.select(expire).addSink(expireSink) >>> (splitStreams.select(replay), splitStreams.select(success)) >>> } >>> >>> unionInputStream.iterate(audit(_)) >>> >>> >>> >>> Thanks and Regards, >>> Mahesh >>> >> >> > >