I don't see what getPolicy is retrieving but you want to use TokenAware
with the shuffle false option in the ctor, it defaults to shuffle true so
that load is spread when people have horribly fat partitions.

On Wed, Mar 15, 2017 at 9:41 AM, Frank Hughes <frankhughes...@gmail.com>
wrote:

> Thanks for reply. Much appreciated.
>
> I should have included more detail. So I am using replication factor 2,
> and the code is using a token aware method of distributing the work so that
> only data that is primarily owned by the node is read on that local
> machine. So i guess this points to the logic im using to determine what is
> primarily owned by a node. I guess this is verging into something that
> should be posted to the java driver list, but i'll post here in case its
> useful or theres an obvious problem:
>
> PoolingOptions poolingOpts = new PoolingOptions();
> poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, this.coreConn);
> poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, this.maxConn);
> poolingOpts.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768);
> poolingOpts.setMaxRequestsPerConnection(HostDistance.REMOTE, 2000);
>
> SocketOptions socketOptions = new SocketOptions();
> socketOptions.setReadTimeoutMillis(15000);
>
> Cluster.Builder builder = Cluster.builder();
> for(String contactPoint: contactPoints){
>     builder.addContactPoint(contactPoint.trim());
>     builder.withPoolingOptions(poolingOpts);
>     builder.withSocketOptions(socketOptions);
> }
>
> builder.withLoadBalancingPolicy(getPolicy())
>         .withQueryOptions(new QueryOptions()
>                 .setPrepareOnAllHosts(true)
>                 .setMetadataEnabled(true)
>         );
>
> Cluster cluster = builder.build();
> Metadata metadata = cluster.getMetadata();
> Session session = cluster.connect(keyspaceName);
> Set<Host> allHosts = metadata.getAllHosts();
> int numberOfHost = 4;
>
> Host localHost = null;
> for (Host host : allHosts) {
>     if(host.getAddress().getHostAddress().equalsIgnoreCase(local))
>         localHost = host;
> }
>
> Map<Host, List<TokenRange>> replicaCount = new HashMap<Host,
> List<TokenRange>>();
> TokenRange[] tokenRanges = 
> unwrapTokenRanges(metadata.getTokenRanges()).toArray(new
> TokenRange[0]);
>
> List<TokenRange> tokenRangeList = Arrays.asList(tokenRanges);
> tokenRangeList.sort(new Comparator<TokenRange>() {
>     @Override
>     public int compare(TokenRange o1, TokenRange o2) {
>         return o1.getStart().compareTo(o2.getStart());
>     }
> });
>
> int numberOfHost = metadata.getAllHosts().size();
> int rangesPerHost = tokenRanges.length / numberOfHost;
>
> for(TokenRange tokenRange : tokenRangeList){
>
>     Set<Host> hosts = metadata.getReplicas(keyspaceName, tokenRange);
>
>     String rangeHosts = "";
>     Iterator<Host> iter = hosts.iterator();
>     while(iter.hasNext()){
>         Host host = iter.next();
>
>         List<TokenRange> tokenRangesForHost = replicaCount.get(host);
>         if(tokenRangesForHost == null){
>             tokenRangesForHost = new ArrayList<TokenRange>();
>         }
>
>         if(tokenRangesForHost.size() < rangesPerHost || !iter.hasNext()){
>             tokenRangesForHost.add(tokenRange);
>             replicaCount.put(host, tokenRangesForHost);
>             break;
>         }
>
>         rangeHosts += host.getAddress().toString();
>     }
> }
>
> for(Host replica : replicaCount.keySet()){
>     List<TokenRange> allocatedRanges = replicaCount.get(replica);
>     for(TokenRange tr : replicaCount.get(replica)){
>         System.out.println(tr.getStart() + " to " + tr.getEnd());
>     }
> }
>
> //get a list of token ranges for this host
> List<TokenRange> tokenRangesForHost = replicaCount.get(localHost);
>
> Again, any thoughts are much appreciated.
>
> Thanks
>
> Frank
>
>
> On 2017-03-15 12:38 (-0000), Ryan Svihla <r...@foundev.pro> wrote:
> > LOCAL_ONE just means local to the datacenter by default the tokenaware
> > policy will go to a replica that owns that data (primary or any replica
> > depends on the driver) and that may or may not be the node the driver
> > process is running on.
> >
> > So to put this more concretely if you have RF 2 with that 4 node cluster
> so
> > 2 nodes will be responsible for that data and if your local process is
> not
> > running on one of those 2 nodes it will definitely HAVE to go to another
> > node.
> >
> > Therefore, if you wanted to pin behavior to a local replica you'd have to
> > send your work out in a token aware fashion where said work only goes to
> > the primary token owner of that data, and remove any shuffling of
> replicas
> > in the process (is only on by default in the java driver to my
> knowledge).
> >
> > On Wed, Mar 15, 2017 at 6:38 AM, Frank Hughes <frankhughes...@gmail.com>
> > wrote:
> >
> > > Hi there,
> > >
> > > Im running a java process on a 4 node cassandra 3.9 cluster on EC2
> > > (instance type t2.2xlarge), the process running separately on each of
> the
> > > nodes (i.e. 4 running JVMs).
> > > The process is just doing reads from Cassandra and building a SOLR
> index
> > > and using the java driver with consistency level LOCAL_ONE.
> > > However, the following exception is through:
> > >
> > > com.datastax.driver.core.exceptions.TransportException: [/
> 10.0.0.2:9042]
> > > Connection has been closed
> > >         at com.datastax.driver.core.exceptions.TransportException.
> > > copy(TransportException.java:38)
> > >         at com.datastax.driver.core.exceptions.TransportException.
> > > copy(TransportException.java:24)
> > >         at com.datastax.driver.core.DriverThrowables.propagateCause(
> > > DriverThrowables.java:37)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > MultiPage.prepareNextRow(ArrayBackedResultSet.java:313)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$
> > > MultiPage.isExhausted(ArrayBackedResultSet.java:269)
> > >         at com.datastax.driver.core.ArrayBackedResultSet$1.
> > > hasNext(ArrayBackedResultSet.java:143)
> > >
> > > where 10.0.0.2 is not the local machine. So my questions:
> > >
> > > - Should this happen when Im using consistency level LOCAL_ONE and just
> > > doing reads ?
> > > - Does this suggest non-local reads are happening ?
> > >
> > > Many thanks for any help/ideas.
> > >
> > > Frank
> > >
> > >
> > >
> >
> >
> > --
> >
> > Thanks,
> > Ryan Svihla
> >
>



-- 

Thanks,
Ryan Svihla

Reply via email to