Hi Andrea, Do you have the error using the builder ?
PoolingOptions poolingOptions = new PoolingOptions(); poolingOptions .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000); Builder builder = Cluster.builder(); builder.addContactPoint(CASSANDRA_ADDRESS); builder.withPort(CASSANDRA_PORT); builder.withPoolingOptions(poolingOptions); sinkBuilderNormalStream .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" + " (user, sensor, timestamp, rdf_stream, observed_value, value)" + " VALUES (?, ?, ?, ?, ?, ?);") .setClusterBuilder(builder) .build(); On 4 November 2017 at 19:27, Andrea Giordano <andrea.giordano....@gmail.com> wrote: > Hi, > I’m using datastax driver to use Cassandra as sink for some data streams > with Apache Flink: > I have a problem executing my application raising an error about the full > queue. I discovered that the default value is 256, probably too low for my > load, so I have raised it using poolingOptions setting > maxRequestsPerConnection as suggested here: http://docs.datastax. > com/en/developer/java-driver/3.1/manual/pooling/. > > Unfortunately with the following code I obtain the following error when I > launch it: > > The implementation of the ClusterBuilder is not serializable. > The object probably contains or references non serializable fields. > > > My code: > > > PoolingOptions poolingOptions = new PoolingOptions(); > poolingOptions > .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768) > .setMaxRequestsPerConnection(HostDistance.REMOTE, 10000); > > > ClusterBuilder cassandraBuilder = new ClusterBuilder() { > private static final long serialVersionUID = 1L; > > @Override > public Cluster buildCluster(Cluster.Builder builder) { > return builder.addContactPoint(CASSANDRA_ADDRESS).withPort(CASSANDRA_PORT > )..withPoolingOptions(poolingOptions).build(); > } > }; > > > sinkBuilderNormalStream > .setQuery("INSERT INTO keyspace_local.values_by_sensors_users" > + " (user, sensor, timestamp, rdf_stream, observed_value, value)" > + " VALUES (?, ?, ?, ?, ?, ?);") > .setClusterBuilder(cassandraBuilder) > .build(); > > > How can I deal with it? >