Hi Sai,

If you directly utilize JdbcSink, you may not be able to catch this
exception.

But, you can create your own SinkFunction and invoke the `invoke` method of
JdbcSink and catch the Exception, and invoke the dlq sink.

As shown below,
```
public class SinkWrapper {

private JdbcSink jdbcSink;
private DlpSink dlpSink;

public void open(Configuration parameters){
    jdbcSink.open(parameters);
   dlpSink.open(parameters);
}

@Override
    public void invoke(T value, Context context) throws IOException {
       try {
         jdbcSink.invoke(value, context);
      } catch (Exception e) {
        dlpSink.invoke(value, context);
      }
   }
}

```


Best,
Feng


On Thu, Oct 19, 2023 at 4:12 PM Sai Vishnu <soudrisaivis...@gmail.com>
wrote:

> Hi team,
>
>
>
> We are using the JdbcSink from flink-connector-jdbc artifact, version
> 3.1.0-1.17.
>
> I want to know if it’s possible to catch BatchUpdateException thrown and
> put that message to DLQ.
>
>
>
> Below is the use case:
>
> Flink job reads a packet from Kafka and writes it to Postgres using the
> JdbcSink.
>
> For any missing field, we are catching it during the data transform layer
> and writing it a side output that writes the exception along with the
> original message to a dlq sink.
>
> In scenarios where a field has characters that is greater than what is
> defined in Postgres, we are currently receiving a BatchUpdateException
> during the update to the Postgres stage.
>
>
>
> Is it possible to catch this exception and write the message to a dlq sink?
>
>
>
> Thanks,
>
> Sai Vishnu Soudri
>

Reply via email to