Hi Feng, Thank you for your response and the suggestion. I was able to cast the SinkFunction to GenericJdbcSinkFunction which enabled me to override the open and close methods. On proceeding further, I have observed that if the batch encounters BatchUpdateException due to a packet inside the batch, then the driver does not proceed further to update other entries to the db.
Is there any way to programmatically recover from this state and let the execution move on to the next packet in the batch or the next batch? Some homework that I did which could be helpful - I see that flink-connector-jdbc uses com.oracle.database.jdbc.ojdbc8 [source: https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/pom.xml#L69-L73] and the Oracle documentation only says this: "After a command in a batch update fails to execute properly and a BatchUpdateException is thrown, the driver may or may not continue to process the remaining commands in the batch".[source: https://docs.oracle.com/javase/8/docs/api/java/sql/BatchUpdateException.html]. Is there a way to mandate this driver to continue the processing even after encountering a failure? I am looking for a way to recover programmatically from the BatchUpdateException and let the driver continue updating to the dB. Any packet failing should be added to a DLQ, or at least be logged with the help of try-catch blocks. Any insights would be much appreciated. One more observation: The BatchUpdateException is nested inside multiple RuntimeExceptions and IOExceptions. Is there any consistency/pattern to this? Thanks, Sai Vishnu Soudri On Tue, 28 Nov 2023 at 20:29, Feng Jin <jinfeng1...@gmail.com> wrote: > Hi Sai > > I think you can directly cast SinkFunction to GenericJdbcSinkFunction. > > > > https://github.com/apache/flink-connector-jdbc/blob/b477d452ba3aac38d53d1f5d4c4820bdad3ad9cd/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/JdbcSink.java#L63C41-L63C41 > ``` > public static <T> SinkFunction<T> sink( > String sql, > JdbcStatementBuilder<T> statementBuilder, > JdbcExecutionOptions executionOptions, > JdbcConnectionOptions connectionOptions) { > return new GenericJdbcSinkFunction<>( > new JdbcOutputFormat<>( > new > SimpleJdbcConnectionProvider(connectionOptions), > executionOptions, > () -> JdbcBatchStatementExecutor.simple(sql, > statementBuilder))); > } > ``` > > > Best, > Feng > > > On Tue, Nov 28, 2023 at 5:49 PM Sai Vishnu <soudrisaivis...@gmail.com> > wrote: > >> Hi team, >> >> >> I am using the JdbcSink from flink-connector-jdbc artifact, version >> 3.1.0-1.17. I am trying to write a Sink wrapper that will internally call >> the invoke method and open method of jdbc sink. While implementing, I see >> that JdbcSink.*sink() *returns a SinkFunction which only exposes the >> invoke method and not the open method. >> >> >> Would appreciate any suggestions on how I can implement this. To add to >> the requirement, the use case is to try and enclose the invoke operation in >> a try catch block so that any exception during the db write process can be >> caught and handled properly. >> >> >> Thanks, >> >> Sai Vishnu Soudri >> >