Hi Sai,
If you directly utilize JdbcSink, you may not be able to catch this
exception.
But, you can create your own SinkFunction and invoke the `invoke` method of
JdbcSink and catch the Exception, and invoke the dlq sink.
As shown below,
```
public class SinkWrapper {
private JdbcSink jdbcSink;
private DlpSink dlpSink;
public void open(Configuration parameters){
jdbcSink.open(parameters);
dlpSink.open(parameters);
}
@Override
public void invoke(T value, Context context) throws IOException {
try {
jdbcSink.invoke(value, context);
} catch (Exception e) {
dlpSink.invoke(value, context);
}
}
}
```
Best,
Feng
On Thu, Oct 19, 2023 at 4:12 PM Sai Vishnu <[email protected]>
wrote:
> Hi team,
>
>
>
> We are using the JdbcSink from flink-connector-jdbc artifact, version
> 3.1.0-1.17.
>
> I want to know if it’s possible to catch BatchUpdateException thrown and
> put that message to DLQ.
>
>
>
> Below is the use case:
>
> Flink job reads a packet from Kafka and writes it to Postgres using the
> JdbcSink.
>
> For any missing field, we are catching it during the data transform layer
> and writing it a side output that writes the exception along with the
> original message to a dlq sink.
>
> In scenarios where a field has characters that is greater than what is
> defined in Postgres, we are currently receiving a BatchUpdateException
> during the update to the Postgres stage.
>
>
>
> Is it possible to catch this exception and write the message to a dlq sink?
>
>
>
> Thanks,
>
> Sai Vishnu Soudri
>