Here is my code

stream.flatMap(new FlatMapFunction<byte[], Void>() {

    @Override
    public void flatMap(byte[] value, Collector<Void> out) throws Exception {
        Parser.setInsert(true);
        CassandraConnection.connect();
        Parser.setInsert(true);
        System.out.println("\n*********** New Message ***********\n");
        System.out.println("Row Number : " + i ++ );
        System.out.println("Message    : " + HexUtiles.bytesToHex(value));
        Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
    }
});



On Thu, Apr 26, 2018 at 5:22 PM, Soheil Pourbafrani <soheil.i...@gmail.com>
wrote:

> I want to use Cassandra native connection (Not Flink Cassandra connection)
> to insert some data into Cassandra. According to the design of the code,
> the connection to Cassandra will open once at the start and all taskmanager
> use it to write data.  It's ok running in local mode.
>
> The problem is when I submit the code on YARN cluster, as each
> taskmanager has it's own JVM, the connection to the Cassandra will not
> share and I should open and close it for each taskmanager. Is there any way
> to have a connection for all taskmanagers?
>

Reply via email to