you'll have to change your sinkfunction to extend RichSinkFunction, and then create your JDBC connection within the open method.

On 07.03.2016 14:08, tole...@toletum.org wrote:
Hi!
I'm doing a process which reads from kafka, makes some things... and after writes on Database (NEO4J). I can read from kafka, and make some things.... But... I have problems with write on Database (JDBC). I tried use a SinkFunction.... It works, but it create a connection each invoke method is called.


--------
DataStream<String> messageStream = this.env.addSource(new FlinkKafkaConsumer082<>(properties.getProperty("topic"), new SimpleStringSchema(), properties));


messageStream.map(new StreamingCrimeSplitter
.filter(new filterFunction())
.keyBy(1);
.addSink(new sinkFunction());


--------
--------
public class sinkFunction
implements SinkFunction<Tuple7<String, String, String, String, String, String,String>> {
        private static final long serialVersionUID = 2859601213304525959L;
        @Override
public void invoke(Tuple7<String, String, String, String, String, String, String> crime) throws Exception {
                System.out.println(crime.f0);
//JDBC connection
        }
}
--------


Somebody knows how I could do just one connection? I tried to do in the Constructor but the JDBC is not serializable.


Thanks
Toletum



Reply via email to