Thank Dan,

I implemented my code as per you suggestions. But I still need to wait and
see if it is working as expected.

If there is any other way anyone has experienced would be highly
appreciated.


--
Kushan Maskey
817.403.7500
M. Miller & Associates <http://mmillerassociates.com/>
[email protected]

On Tue, Mar 3, 2015 at 3:25 PM, Dan DeCapria, CivicScience <
[email protected]> wrote:

> Hi Kushan,
>
> I've blasted production Cassandra rings in the past from Storm, and I have
> not had much difficulty with too many open connections or anything else if
> I cleanup.  Please cf the CassandraWriterBolt.java code below (for CQL
> prepared statements).  Hope this helps, -Dan
>
>         <dependency>
>             <groupId>com.datastax.cassandra</groupId>
>             <artifactId>cassandra-driver-core</artifactId>
>             <version>2.1.2</version>
>             <scope>compile</scope>
>             <exclusions>
>                 <exclusion>
>                     <artifactId>guava</artifactId>
>                     <groupId>com.google.guava</groupId>
>                 </exclusion>
>             </exclusions>
>         </dependency>
>         <dependency>
>             <groupId>org.xerial.snappy</groupId>
>             <artifactId>snappy-java</artifactId>
>             <version>1.0.5</version>
>             <scope>compile</scope>
>         </dependency>
>         <dependency>
>             <groupId>net.jpountz.lz4</groupId>
>             <artifactId>lz4</artifactId>
>             <version>1.2.0</version>
>             <scope>compile</scope>
>         </dependency>
>         <dependency>
>             <groupId>com.google.guava</groupId>
>             <artifactId>guava</artifactId>
>             <version>18.0</version>
>             <scope>compile</scope>
>         </dependency>
>
> import backtype.storm.task.OutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichBolt;
> import backtype.storm.tuple.Tuple;
> import com.datastax.driver.core.*;
> import com.datastax.driver.core.exceptions.QueryTimeoutException;
> import com.datastax.driver.core.policies.*;
> import java.util.Map;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class CassandraWriterBolt extends BaseRichBolt {
>
>     private OutputCollector collector;
>     private static final Logger logger =
> LoggerFactory.getLogger(CassandraWriterBolt.class);
>     private TopologyContext context;
>
>     private String node;
>     private String key_space;
>     private String cql;
>
>     private Cluster cluster;
>     private Session session;
>     private PreparedStatement prepared_statement;
>     private int prepared_statement_value_size;
>
>     public CassandraWriterBolt(String node, String key_space, String cql) {
>         this.node = node.trim();
>         this.key_space = key_space.trim();
>         this.cql = cql.trim();
>     }
>
>     @Override
>     public void declareOutputFields(OutputFieldsDeclarer declarer) {
>         //  we don't emit anything from here
>     }
>
>     private void initialize() {
>         this.cluster = Cluster.builder()
>                 .withoutJMXReporting()
>                 .withoutMetrics()
>                 .addContactPoint(this.node)
>
> .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
>                 .withReconnectionPolicy(new
> ExponentialReconnectionPolicy(100L, 5000L))
>                 .withLoadBalancingPolicy(new TokenAwarePolicy(new
> RoundRobinPolicy()))
>                 .build();
>
>         this.session = this.cluster.connect(this.key_space);
>         this.prepared_statement =
> this.session.prepare(this.cql).setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
>         this.prepared_statement_value_size = this.cql.length() -
> this.cql.replace("?", "").length();
>         logger.debug("Initialization of Writer Cassandra Cluster Object ["
> + this.cluster.getClusterName() + "]");
>     }
>
>     @Override
>     public void prepare(Map stormConf, TopologyContext context,
> OutputCollector collector) {
>         this.context = context;
>         this.collector = collector;
>         initialize();
>     }
>
>     @Override
>     public void execute(Tuple input) {
>         try {
>
> this.session.execute(this.prepared_statement.bind(input.getValues().subList(0,
> this.prepared_statement_value_size).toArray()));
>             this.collector.ack(input);
>         } catch (QueryTimeoutException qtex) {
>             logger.warn(
>                     "QueryTimeoutException: " + qtex.getMessage() +
>                     "\ttime = " + System.currentTimeMillis() +
>                     "\ttuple = " + input +
>                     "\t Reinitializing Connection. "
>             );
>             cleanup();
>             initialize();
>             this.collector.reportError(qtex);
>             this.collector.fail(input);
>         } catch (Exception ex) {
>             logger.error("Exception: " + ex.getMessage() + ";\t tuple = "
> + input);
>             this.collector.reportError(ex);
>             this.collector.fail(input);
>         }
>     }
>
>     @Override
>     public void cleanup() {
>         logger.debug("Closing Cassandra Writer Cluster Object [" +
> this.cluster.getClusterName() + "]");
>         this.session.close();
>         this.cluster.close();
>     }
>
> }
>
>
>
>
> On Tue, Mar 3, 2015 at 4:12 PM, Kushan Maskey <
> [email protected]> wrote:
>
>> I have bolt that inserts data into Cassandra database. When I kill the
>> topology on test and production server, how can I make sure that the
>> Cassandra session. The reasons I am asking is sometime I get too many files
>> open exception thrown which tells me that there are number of open
>> connections. Please let me know if I am not correct.
>>
>> I try to add code in the cleanup() to go and close the sessions didnt
>> help.
>>
>> Thanks.
>> --
>> Kushan Maskey
>> 817.403.7500
>> M. Miller & Associates <http://mmillerassociates.com/>
>> [email protected]
>>
>
>
>

Reply via email to