you'll have to change your sinkfunction to extend RichSinkFunction, and
then create your JDBC connection within the open method.
I'm doing a process which reads from kafka, makes some things... and
after writes on Database (NEO4J). I can read from kafka, and make some
things.... But... I have problems with write on Database (JDBC).
I tried use a SinkFunction.... It works, but it create a connection
each invoke method is called.
DataStream<String> messageStream = this.env.addSource(new
FlinkKafkaConsumer082<>(properties.getProperty("topic"), new
SimpleStringSchema(), properties)); StreamingCrimeSplitter
.filter(new filterFunction())
.addSink(new sinkFunction());
public class sinkFunction
implements SinkFunction<Tuple7<String, String, String, String, String,
String,String>> {
private static final long serialVersionUID = 2859601213304525959L;
public void invoke(Tuple7<String, String, String, String,
String, String, String> crime) throws Exception {
//JDBC connection
Somebody knows how I could do just one connection? I tried to do in
the Constructor but the JDBC is not serializable.