Hi Poorvank,
Thank you for the options! This is very insightful!

While I see that option #2 is better than option #1, I still don't want to
use raw KafkaProducer inside AsyncSinkWriter, but rather delegate the work
to KafkaSink.
So I am currently looking into option #3 and have a question - what are
pros/cons of switching from AsyncSink/AsyncSinkWriter to RichAsyncFunction?
Do I lose anything?

-- Alex

On Tue, Nov 4, 2025 at 10:46 PM Poorvank Bhatia <[email protected]>
wrote:

> Hey Aliaksandr,
>
> You can’t attach a side output to AsyncSinkWriter. Side outputs are a
> DataStream concept (via OutputTag in Process/Async functions) and must
> happen before the sink.
> If your goal is to capture non-200 request/response pairs for review, IMO
> you’ve got a few options:
>
> *Try External Kafka Producer:  *Override submitRequestEntries and
> maintain your own Kafka producer:
> Something like:
>
> @Override
>>
>>   protected void submitRequestEntries(
>>           List<RequestEntryT> requestEntries,
>>           ResultHandler<RequestEntryT> resultHandler) {
>>       httpClient.sendAsync(requestEntries).whenComplete((response, error)
>> -> {
>>           if (response.statusCode() != 200) {
>>               // Send to Kafka for human review
>>               for (RequestEntryT entry : requestEntries) {
>>                   kafkaProducer.send(new ProducerRecord<>(
>>                       "failed-requests-topic",
>>                       createFailureRecord(entry, response)
>>                   ));
>>               }
>>               // Don't retry these in Flink - they're in Kafka now
>>               resultHandler.complete();
>>           }
>>       });
>>   }
>>
>> If you want you can even add a retry count i.e it should be retired x
> times and then should be pushed to external kafka for human review. The
> issue here is that sending to Kafka inside submitRequestEntries (or any
> other external data source and then calling resultHandler.complete()) is
> risky: once you call complete(), Flink considers the batch done for
> checkpointing. If the job crashes before the Kafka (or any other External
> Store's) send() is durably acknowledged, you’ll lose those failure records
> and violate the sink’s at-least-once intent.
>
> *Now if you have Flink 1.19+ you can try to:*
>
> Keep AsyncSinkWriter and use a SupportsCommitter
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html>
> interface as a “human-review” path:
>
> Keep AsyncSinkWriter but also implement CommittingSinkWriter
> <https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.html>
>  as
> a "human-review" or "Dead-letter" path. You can surface failed entries
> (non-200) as committables from the writer and have a Committer publish them
> to Kafka. Committer would run in a checkpoint-aligned stage; commits must
> be idempotent and may be retried, which is exactly what you want for
> at-least-once (or better) semantics of your failure stream.
>
> Implement SupportsCommitter<YourCommittableObject> on the sink so Flink
> hands those committables to a Committer, which does the actual publish to
> Kafka after a successful checkpoint (and will replay on
> recovery) AsyncSinkWriter already ensures all in-flight requests are
> finished before a commit, which is why the sink has at-least-once
> semantics. Your dead-letter path piggybacks on that.
>
> Or finally you can try HTTP call before the sink using *AsyncDataStream +
> side outputs,* something like:
>
> OutputTag<FailedRequest> failedTag = new OutputTag<>("failed"){};
>> SingleOutputStreamOperator<MyEvent> ok =
>>   AsyncDataStream.unorderedWait(
>>       stream, new AsyncHttpFunction(failedTag), timeout,
>> TimeUnit.MILLISECONDS);
>> ok.getSideOutput(failedTag).sinkTo(kafkaSink);   // dead-letter topic
>> ok.sinkTo(yourHttpAsycnSink);
>
>
> Let me know it it makes sense.
>
> Thanks,
> Poorvank
>
>>

Reply via email to