Your program does not seem to contain a loop. Hence, it's unclear to me atm what could be the issue.
Does the application commit offset on the input topic, what would be an indicator that it actually does make progress. Do you change the key between joins, ie, is there multiple sub-topologies that are connected via repartition topics, or is it one single sub-topology? -Matthias On 1/30/20 10:29 AM, Sachin Mittal wrote: > Hi, > In a case I have found that when I define my topology using streams DSL it > tends to go into infinite loop. > This usually happens if I start my stream and shut it down and restart it > again after a while (by that time source topic has moved ahead). > > Stream processing seems to be stuck in a loop and does not seem to progress > ahead. > > My topology is something like this: > > source.branch( > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ..., > (k, v) -> ... > ) > stream12 = stream1.join(stream2, ...).peek((k, v12) -> log(v12)) > stream34 = stream3.join(stream4, ...).peek((k, v34) -> log(v34)) > stream56 = stream5.join(stream6, ...).peek((k, v56) -> log(v56)) > stream78 = stream7.join(stream8, ...).peek((k, v78) -> log(v78)) > stream1234 = stream12.join(stream34, ...).peek((k, v1234) -> log(v1234)) > stream123478 = stream1234.join(stream78, ...).peek((k, v123478) -> > log(v123478)) > stream567 = stream56.join(stream7, ...).peek((k, v567) -> log(v567)) > final = stream123478.join(stream567, ...).foreach((k, vFinal) -> > log(vFinal)) > > What I observe is that it keeps printing log(vFinal)with same value again > and again and does not seem to progress ahead. > Any idea why this may be happening. What can I check to understand what > could be going wrong. > > If I stop kafka and delete all the data and the restart everything then it > all works fine from the next set of data. > But then I loose the source streams data from processing, before I had > restarted it (when it went into this infinite loop). > > Thanks > Sachin >
signature.asc
Description: OpenPGP digital signature