Hey Aliaksandr,
You can’t attach a side output to AsyncSinkWriter. Side outputs are a
DataStream concept (via OutputTag in Process/Async functions) and must
happen before the sink.
If your goal is to capture non-200 request/response pairs for review, IMO
you’ve got a few options:
*Try External Kafka Producer: *Override submitRequestEntries and maintain
your own Kafka producer:
Something like:
@Override
>
> protected void submitRequestEntries(
> List<RequestEntryT> requestEntries,
> ResultHandler<RequestEntryT> resultHandler) {
> httpClient.sendAsync(requestEntries).whenComplete((response, error)
> -> {
> if (response.statusCode() != 200) {
> // Send to Kafka for human review
> for (RequestEntryT entry : requestEntries) {
> kafkaProducer.send(new ProducerRecord<>(
> "failed-requests-topic",
> createFailureRecord(entry, response)
> ));
> }
> // Don't retry these in Flink - they're in Kafka now
> resultHandler.complete();
> }
> });
> }
>
> If you want you can even add a retry count i.e it should be retired x
times and then should be pushed to external kafka for human review. The
issue here is that sending to Kafka inside submitRequestEntries (or any
other external data source and then calling resultHandler.complete()) is
risky: once you call complete(), Flink considers the batch done for
checkpointing. If the job crashes before the Kafka (or any other External
Store's) send() is durably acknowledged, you’ll lose those failure records
and violate the sink’s at-least-once intent.
*Now if you have Flink 1.19+ you can try to:*
Keep AsyncSinkWriter and use a SupportsCommitter
<https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/SupportsCommitter.html>
interface as a “human-review” path:
Keep AsyncSinkWriter but also implement CommittingSinkWriter
<https://nightlies.apache.org/flink/flink-docs-release-1.19/api/java/org/apache/flink/api/connector/sink2/CommittingSinkWriter.html>
as
a "human-review" or "Dead-letter" path. You can surface failed entries
(non-200) as committables from the writer and have a Committer publish them
to Kafka. Committer would run in a checkpoint-aligned stage; commits must
be idempotent and may be retried, which is exactly what you want for
at-least-once (or better) semantics of your failure stream.
Implement SupportsCommitter<YourCommittableObject> on the sink so Flink
hands those committables to a Committer, which does the actual publish to
Kafka after a successful checkpoint (and will replay on
recovery) AsyncSinkWriter already ensures all in-flight requests are
finished before a commit, which is why the sink has at-least-once
semantics. Your dead-letter path piggybacks on that.
Or finally you can try HTTP call before the sink using *AsyncDataStream +
side outputs,* something like:
OutputTag<FailedRequest> failedTag = new OutputTag<>("failed"){};
> SingleOutputStreamOperator<MyEvent> ok =
> AsyncDataStream.unorderedWait(
> stream, new AsyncHttpFunction(failedTag), timeout,
> TimeUnit.MILLISECONDS);
> ok.getSideOutput(failedTag).sinkTo(kafkaSink); // dead-letter topic
> ok.sinkTo(yourHttpAsycnSink);
Let me know it it makes sense.
Thanks,
Poorvank
>