Hi Kushan,
I've blasted production Cassandra rings in the past from Storm, and I have
not had much difficulty with too many open connections or anything else if
I cleanup. Please cf the CassandraWriterBolt.java code below (for CQL
prepared statements). Hope this helps, -Dan
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.2</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>guava</artifactId>
<groupId>com.google.guava</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.xerial.snappy</groupId>
<artifactId>snappy-java</artifactId>
<version>1.0.5</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.2.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
<scope>compile</scope>
</dependency>
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.QueryTimeoutException;
import com.datastax.driver.core.policies.*;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CassandraWriterBolt extends BaseRichBolt {
private OutputCollector collector;
private static final Logger logger =
LoggerFactory.getLogger(CassandraWriterBolt.class);
private TopologyContext context;
private String node;
private String key_space;
private String cql;
private Cluster cluster;
private Session session;
private PreparedStatement prepared_statement;
private int prepared_statement_value_size;
public CassandraWriterBolt(String node, String key_space, String cql) {
this.node = node.trim();
this.key_space = key_space.trim();
this.cql = cql.trim();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// we don't emit anything from here
}
private void initialize() {
this.cluster = Cluster.builder()
.withoutJMXReporting()
.withoutMetrics()
.addContactPoint(this.node)
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new
ExponentialReconnectionPolicy(100L, 5000L))
.withLoadBalancingPolicy(new TokenAwarePolicy(new
RoundRobinPolicy()))
.build();
this.session = this.cluster.connect(this.key_space);
this.prepared_statement =
this.session.prepare(this.cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
this.prepared_statement_value_size = this.cql.length() -
this.cql.replace("?", "").length();
logger.debug("Initialization of Writer Cassandra Cluster Object ["
+ this.cluster.getClusterName() + "]");
}
@Override
public void prepare(Map stormConf, TopologyContext context,
OutputCollector collector) {
this.context = context;
this.collector = collector;
initialize();
}
@Override
public void execute(Tuple input) {
try {
this.session.execute(this.prepared_statement.bind(input.getValues().subList(0,
this.prepared_statement_value_size).toArray()));
this.collector.ack(input);
} catch (QueryTimeoutException qtex) {
logger.warn(
"QueryTimeoutException: " + qtex.getMessage() +
"\ttime = " + System.currentTimeMillis() +
"\ttuple = " + input +
"\t Reinitializing Connection. "
);
cleanup();
initialize();
this.collector.reportError(qtex);
this.collector.fail(input);
} catch (Exception ex) {
logger.error("Exception: " + ex.getMessage() + ";\t tuple = " +
input);
this.collector.reportError(ex);
this.collector.fail(input);
}
}
@Override
public void cleanup() {
logger.debug("Closing Cassandra Writer Cluster Object [" +
this.cluster.getClusterName() + "]");
this.session.close();
this.cluster.close();
}
}
On Tue, Mar 3, 2015 at 4:12 PM, Kushan Maskey <
[email protected]> wrote:
> I have bolt that inserts data into Cassandra database. When I kill the
> topology on test and production server, how can I make sure that the
> Cassandra session. The reasons I am asking is sometime I get too many files
> open exception thrown which tells me that there are number of open
> connections. Please let me know if I am not correct.
>
> I try to add code in the cleanup() to go and close the sessions didnt help.
>
> Thanks.
> --
> Kushan Maskey
> 817.403.7500
> M. Miller & Associates <http://mmillerassociates.com/>
> [email protected]
>