Hi, 
I’m using datastax driver to use Cassandra as sink for some data streams with 
Apache Flink:
I have a problem executing my application raising an error about the full 
queue. I discovered that the default value is 256, probably too low for my 
load, so I have raised it using poolingOptions setting maxRequestsPerConnection 
as suggested here: 
http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/ 
<http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/>.

Unfortunately with the following code I obtain the following error when I 
launch it:

The implementation of the ClusterBuilder is not serializable. 
The object probably contains or references non serializable fields.


My code:


PoolingOptions poolingOptions = new PoolingOptions();
                poolingOptions
                  .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
                  .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);

                
ClusterBuilder cassandraBuilder = new ClusterBuilder() {
        private static final long serialVersionUID = 1L;

        @Override
        public Cluster buildCluster(Cluster.Builder builder) {
                return 
builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT)..withPoolingOptions(poolingOptions).build();
        }
};


sinkBuilderNormalStream
        .setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
                + " (user, sensor, timestamp, rdf_stream, observed_value, 
value)"
                + " VALUES (?, ?, ?, ?, ?, ?);")
        .setClusterBuilder(cassandraBuilder)
        .build();


How can I deal with it?

Reply via email to