Hi,
In a case I have found that when I define my topology using streams DSL it
tends to go into infinite loop.
This usually happens if I start my stream and shut it down and restart it
again after a while (by that time source topic has moved ahead).

Stream processing seems to be stuck in a loop and does not seem to progress
ahead.

My topology is something like this:

source.branch(
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...,
        (k, v) -> ...
    )
stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12))
stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34))
stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56))
stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78))
stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234))
stream123478 = stream1234.join(stream78, ...).peek((k, v123478) ->
log(v123478))
stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567))
final = stream123478.join(stream567, ...).foreach((k, vFinal) ->
log(vFinal))

What I observe is that it keeps printing log(vFinal)with same value again
and again and does not seem to progress ahead.
Any idea why this may be happening. What can I check to understand what
could be going wrong.

If I stop kafka and delete all the data and the restart everything then it
all works fine from the next set of data.
But then I loose the source streams data from processing, before I had
restarted it (when it went into this infinite loop).

Thanks
Sachin

Reply via email to