Hello Aliaksandr, IMO both serve different purposes. AsyncSinkWriter is majorly for sinks but with inbuilt capabilities of Batching, Retryability and rate limiting. But as of now it doesn't come up with side outputs. With RichAsyncFunction(+ side output to Kafka) : Lets you branch before any sink: emit failures to a side output (DLQ) and keep successes on the mainstream. Side outputs are a DataStream feature and easy to wire with OutputTag. I think it comes up with tunable concurrency & ordering (unorderedWait for throughput, orderedWait to preserve order). Built-in async-IO retry helpers exist. Both guarantee at least once. You are looking for Option#3 i.e pre-sink branching, use RichAsyncFunction + side output → KafkaSink and keep the HTTP call only in the async function, but here you trade away the sink’s built-ins (batching/backoff/partial retries/rate limiting) and may need to re-implement whatever you need.
On Thu, Nov 6, 2025 at 6:16 AM Aliaksandr Sazonenka < [email protected]> wrote: > Hi Poorvank, > Thank you for the options! This is very insightful! > > While I see that option #2 is better than option #1, I still don't want to > use raw KafkaProducer inside AsyncSinkWriter, but rather delegate the work > to KafkaSink. > So I am currently looking into option #3 and have a question - what are > pros/cons of switching from AsyncSink/AsyncSinkWriter to RichAsyncFunction? > Do I lose anything? > > -- Alex > > On Tue, Nov 4, 2025 at 10:46 PM Poorvank Bhatia <[email protected]> > wrote: > >> Hey Aliaksandr, >> >> You can’t attach a side output to AsyncSinkWriter. Side outputs are a >> DataStream concept (via OutputTag in Process/Async functions) and must >> happen before the sink. >> If your goal is to capture non-200 request/response pairs for review, IMO >> you’ve got a few options: >> >> *Try External Kafka Producer: *Override submitRequestEntries and >> maintain your own Kafka producer: >> Something like: >> >> @Override >>> >>> protected void submitRequestEntries( >>> List<RequestEntryT> requestEntries, >>> ResultHandler<RequestEntryT> resultHandler) { >>> httpClient.sendAsync(requestEntries).whenComplete((response, >>> error) -> { >>> if (response.statusCode() != 200) { >>> // Send to Kafka for human review >>> for (RequestEntryT entry : requestEntries) { >>> kafkaProducer.send(new ProducerRecord<>( >>> "failed-requests-topic", >>> createFailureRecord(entry, response) >>> )); >>> } >>> // Don't retry these in Flink - they're in Kafka now >>> resultHandler.complete(); >>> } >>> }); >>> } >>> >>> If you want you can even add a retry count i.e it should be retired x >> times and then should be pushed to external kafka for human review. The >> issue here is that sending to Kafka inside submitRequestEntries (or any >> other external data source and then calling resultHandler.complete()) is >> risky: once you call complete(), Flink considers the batch done for >> checkpointing. If the job crashes before the Kafka (or any other External >> Store's) send() is durably acknowledged, you’ll lose those failure records >> and violate the sink’s at-least-once intent. >> >> *Now if you have Flink 1.19+ you can try to:* >> >> Keep AsyncSinkWriter and use a SupportsCommitter >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html> >> interface as a “human-review” path: >> >> Keep AsyncSinkWriter but also implement CommittingSinkWriter >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.html> >> as >> a "human-review" or "Dead-letter" path. You can surface failed entries >> (non-200) as committables from the writer and have a Committer publish them >> to Kafka. Committer would run in a checkpoint-aligned stage; commits must >> be idempotent and may be retried, which is exactly what you want for >> at-least-once (or better) semantics of your failure stream. >> >> Implement SupportsCommitter<YourCommittableObject> on the sink so Flink >> hands those committables to a Committer, which does the actual publish to >> Kafka after a successful checkpoint (and will replay on >> recovery) AsyncSinkWriter already ensures all in-flight requests are >> finished before a commit, which is why the sink has at-least-once >> semantics. Your dead-letter path piggybacks on that. >> >> Or finally you can try HTTP call before the sink using *AsyncDataStream >> + side outputs,* something like: >> >> OutputTag<FailedRequest> failedTag = new OutputTag<>("failed"){}; >>> SingleOutputStreamOperator<MyEvent> ok = >>> AsyncDataStream.unorderedWait( >>> stream, new AsyncHttpFunction(failedTag), timeout, >>> TimeUnit.MILLISECONDS); >>> ok.getSideOutput(failedTag).sinkTo(kafkaSink); // dead-letter topic >>> ok.sinkTo(yourHttpAsycnSink); >> >> >> Let me know it it makes sense. >> >> Thanks, >> Poorvank >> >>>
