Here is my code stream.flatMap(new FlatMapFunction<byte[], Void>() {
@Override public void flatMap(byte[] value, Collector<Void> out) throws Exception { Parser.setInsert(true); CassandraConnection.connect(); Parser.setInsert(true); System.out.println("\n*********** New Message ***********\n"); System.out.println("Row Number : " + i ++ ); System.out.println("Message : " + HexUtiles.bytesToHex(value)); Parser.parse(ByteBuffer.wrap(value), ConfigHashMap); } }); On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani <soheil.i...@gmail.com> wrote: > I want to use Cassandra native connection (Not Flink Cassandra connection) > to insert some data into Cassandra. According to the design of the code, > the connection to Cassandra will open once at the start and all taskmanager > use it to write data. It's ok running in local mode. > > The problem is when I submit the code on YARN cluster, as each > taskmanager has it's own JVM, the connection to the Cassandra will not > share and I should open and close it for each taskmanager. Is there any way > to have a connection for all taskmanagers? >