Retry logic and per-request timeouts should be setup within asyncInvoke() (with all error-handling being done via plain CompletableFuture logic), with timeout() sort of acting as a global timeout after which you want the job to fail (e.g., to guard against mistakes in the asyncInvoke() logic).

The reason you shouldn't use timeout() for retries is that it is only ever called once for an input element.

On 04/01/2023 13:40, Yoni Gibbs wrote:
I'm following the documentation here <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api>, getting async IO to retry on failure. What I want is for the async action to be attempted, say, three times, and then give up and continue processing further records. If it fails after three times, I want to sink the record to a DLQ. I believe the way I should do that is by overriding |timeout|​, and in there outputting the record to a side output, which I then sink to a DLQ of some sort. (Correct me if I'm wrong and there's a better way of doing this.)

The record in the DLQ should contain error information about what went wrong (e.g. the exceptions that occurred on the three failed attempts). How can I get access to this in the |timeout|​ function?

Thanks!

Reply via email to