Hi Alessandro, Sorry if I'm missing some of the context, but could you just keep retrying the API call inside a loop? This would block any other processing by the same thread, but it would allow Streams to stay up in the face of transient failures. Otherwise, I'm afraid that throwing an exception is the right thing to do. Streams would re-process the record in question when it starts back up, but you'd have to re-start it. You can do that programmatically, but it's a bit heavyweight as a response to a transient API call failure.
For reference, this is one of several problems that comes up when you need to call out to external services during processing. Streams currently lacks full support to make this a really pleasant experience, but it's a perennial topic of discussion. See https://cwiki.apache.org/confluence/display/KAFKA/KIP-311%3A+Async+processing+with+dynamic+scheduling+in+Kafka+Streams and https://cwiki.apache.org/confluence/display/KAFKA/KIP-408%3A+Add+Asynchronous+Processing+To+Kafka+Streams for a couple of attempts to wrestle with the domain. To answer your latter question, the store should be returned to its prior state when you restart, but if you want to be absolutely sure this happens, you need to enable EOS. That will have the side-effect of discarding any local state after a crash, though, which makes the "crash and recover" strategy even more heavyweight. I'd recommend wrapping the API call in a retry loop that's as long as you can tolerate and then crashing if you still don't get through. Be sure to also look through the docs and find any heartbeat configs you need to set. Off the top of my head, I think "max poll interval" at least needs to be set bigger than your maximum expected pause. Probably 2x the total retry-loop time would be a good choice. I hope this helps, -John On Fri, Jul 5, 2019 at 6:30 PM Alessandro Tagliapietra <tagliapietra.alessan...@gmail.com> wrote: > > Hello everyone, > > I'm looking into a way to reprocess messages in case of soft-errors (not > exceptions) > For example we have a topology that does this: > input stream -> filtering/flatmap -> window and aggregate > > in our aggregate step (maybe should be moved into an additional step) we > make an API call to one of our services. > > What I would like to do is to reprocess that message, even better if > possible just the window computation when the API call fails. > > By reading this > https://docs.confluent.io/current/streams/concepts.html#streams-concepts-processing-guarantees > if > I'm not mistaken with the default at least one semantic, if I throw an > exception the topology will reprocess the messages after the last commit, > is it possible instead to just soft-retry the last message without throwing > an exception and possibly reprocess also older correctly processed messages? > > Also, if my topology starts from a stream uses multiple stores before > windowing, if there's an error in the windowing step, what happens to the > stores changes? When the message is reprocessed, will the store be in the > state it was after it processed the message on the first try? > > Thank you in advance > > -- > Alessandro Tagliapietra