Hi Poorvank, Thank you for the options! This is very insightful! While I see that option #2 is better than option #1, I still don't want to use raw KafkaProducer inside AsyncSinkWriter, but rather delegate the work to KafkaSink. So I am currently looking into option #3 and have a question - what are pros/cons of switching from AsyncSink/AsyncSinkWriter to RichAsyncFunction? Do I lose anything?
-- Alex On Tue, Nov 4, 2025 at 10:46 PM Poorvank Bhatia <[email protected]> wrote: > Hey Aliaksandr, > > You can’t attach a side output to AsyncSinkWriter. Side outputs are a > DataStream concept (via OutputTag in Process/Async functions) and must > happen before the sink. > If your goal is to capture non-200 request/response pairs for review, IMO > you’ve got a few options: > > *Try External Kafka Producer: *Override submitRequestEntries and > maintain your own Kafka producer: > Something like: > > @Override >> >> protected void submitRequestEntries( >> List<RequestEntryT> requestEntries, >> ResultHandler<RequestEntryT> resultHandler) { >> httpClient.sendAsync(requestEntries).whenComplete((response, error) >> -> { >> if (response.statusCode() != 200) { >> // Send to Kafka for human review >> for (RequestEntryT entry : requestEntries) { >> kafkaProducer.send(new ProducerRecord<>( >> "failed-requests-topic", >> createFailureRecord(entry, response) >> )); >> } >> // Don't retry these in Flink - they're in Kafka now >> resultHandler.complete(); >> } >> }); >> } >> >> If you want you can even add a retry count i.e it should be retired x > times and then should be pushed to external kafka for human review. The > issue here is that sending to Kafka inside submitRequestEntries (or any > other external data source and then calling resultHandler.complete()) is > risky: once you call complete(), Flink considers the batch done for > checkpointing. If the job crashes before the Kafka (or any other External > Store's) send() is durably acknowledged, you’ll lose those failure records > and violate the sink’s at-least-once intent. > > *Now if you have Flink 1.19+ you can try to:* > > Keep AsyncSinkWriter and use a SupportsCommitter > <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html> > interface as a “human-review” path: > > Keep AsyncSinkWriter but also implement CommittingSinkWriter > <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.html> > as > a "human-review" or "Dead-letter" path. You can surface failed entries > (non-200) as committables from the writer and have a Committer publish them > to Kafka. Committer would run in a checkpoint-aligned stage; commits must > be idempotent and may be retried, which is exactly what you want for > at-least-once (or better) semantics of your failure stream. > > Implement SupportsCommitter<YourCommittableObject> on the sink so Flink > hands those committables to a Committer, which does the actual publish to > Kafka after a successful checkpoint (and will replay on > recovery) AsyncSinkWriter already ensures all in-flight requests are > finished before a commit, which is why the sink has at-least-once > semantics. Your dead-letter path piggybacks on that. > > Or finally you can try HTTP call before the sink using *AsyncDataStream + > side outputs,* something like: > > OutputTag<FailedRequest> failedTag = new OutputTag<>("failed"){}; >> SingleOutputStreamOperator<MyEvent> ok = >> AsyncDataStream.unorderedWait( >> stream, new AsyncHttpFunction(failedTag), timeout, >> TimeUnit.MILLISECONDS); >> ok.getSideOutput(failedTag).sinkTo(kafkaSink); // dead-letter topic >> ok.sinkTo(yourHttpAsycnSink); > > > Let me know it it makes sense. > > Thanks, > Poorvank > >>
