...and you are sure your server does not hold a (name, value) hotspot?
Just an idea, Jens — Sent from Mailbox On Tue, Nov 11, 2014 at 9:38 PM, Gabriel Menegatti <gabr...@s1mbi0se.com.br> wrote: > Hello, > We are facing a "difficult" problem when trying to use the TokenAware > policy when performing writes, as our write request are always unbalanced > due some unknown reason. Maybe we have something wrong on our data model?!? > We are trying to use Token Aware to send the writes, because as far as we > understand, the objective of the Token Aware policy is to send the write > request directly to the node that owns the data according to the > token/routing_key, so the request don't need to be received by a > coordinator and then re-routed to the right node. Is our understanding > right? > We have a cluster with 2 DCs, where DC1 has 30 servers and DC2 10 servers > (but same storage capacity of DC1). We are using the DSE Community 2.1.1 > and the DataStax Python Driver 2.1.2. > Our configurations are: > - Murmur3Partitioner > - vnodes (num_tokens: 256) > - NetworkTopologyStrategy > The key space is called "identificationmr" and the replication > factor/strategy used is 1 for DC1 and 2 for DC2. > We are performing all the writes using consistency level ONE (not > LOCAL_ONE), using the DC1 nodes as seeds. > If we take a look (image below) on the data distribution, everything seems > to be working well, everything is balanced, see image below: > [image: Inline image 1] > *So the problem is:* > We tried writing data using many load balancing policy to see what is going > on, but so far, no conclusions and all the write requests remains > unbalanced. > 1) When we try to write data to the column family "user_data" or > "user_data_idx", using the token aware policy, for some reason one specific > node receives most of the write requests (maybe because its somehow acting > as coordinator, even if it should not), see image below: > [image: Inline image 3] > 2) If we try to make the writes using DC Aware Round Robin policy, > everything is "equally distributed", but few minutes latter the > cluster/nodes starts to give timeout.. probably because all the nodes are > acting as coordinators and routing all the write requests to other nodes > all the time. Since our replication factor is 1 and we have 30 servers, for > every 100 write requests we send to a node, around 3 of them remains on the > node that received the request and the rest need to be re-routed. > *Our schema:* > *- identificationmr.user_data:* > CREATE TABLE identificationmr.user_data ( > i_name text, > i_value text, > type int, > creation_key bigint, > data text, > identifiers text, > on_boarded boolean, > origin text, > prefix text, > sub_prefix text, > PRIMARY KEY ((i_name, i_value), type, creation_key, data) > ) WITH CLUSTERING ORDER BY (type ASC, creation_key ASC, data ASC) > AND bloom_filter_fp_chance = 0.01 > AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' > AND comment = '' > AND compaction = {'min_threshold': '4', 'class': > 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', > 'max_threshold': '32'} > AND compression = {'sstable_compression': > 'org.apache.cassandra.io.compress.LZ4Compressor'} > AND dclocal_read_repair_chance = 0.1 > AND default_time_to_live = 0 > AND gc_grace_seconds = 864000 > AND max_index_interval = 2048 > AND memtable_flush_period_in_ms = 0 > AND min_index_interval = 128 > AND read_repair_chance = 0.0 > AND speculative_retry = '99.0PERCENTILE'; > *- identificationmr.user_data_idx:* > CREATE TABLE identificationmr.user_data_idx ( > creation_key bigint, > page_number int, > i_name text, > i_value text, > PRIMARY KEY ((creation_key, page_number), i_name, i_value) > ) WITH CLUSTERING ORDER BY (i_name ASC, i_value ASC) > AND bloom_filter_fp_chance = 0.01 > AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}' > AND comment = '' > AND compaction = {'min_threshold': '4', 'class': > 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', > 'max_threshold': '32'} > AND compression = {'sstable_compression': > 'org.apache.cassandra.io.compress.LZ4Compressor'} > AND dclocal_read_repair_chance = 0.1 > AND default_time_to_live = 0 > AND gc_grace_seconds = 864000 > AND max_index_interval = 2048 > AND memtable_flush_period_in_ms = 0 > AND min_index_interval = 128 > AND read_repair_chance = 0.0 > AND speculative_retry = '99.0PERCENTILE'; > *Python code used to perform the write:* > self.INSERT_USER_DATA = "INSERT INTO " + self.USER_DATA + " > (i_name, i_value, creation_key, type, data, prefix, sub_prefix, origin, > identifiers, on_boarded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;" > self.INSERT_USER_QUEUE_REDUCER = "INSERT INTO " + > self.USER_QUEUE_REDUCER + "{jn} (i_name, i_value, start_key, end_key) > VALUES (?, ?, ?, ?) ;" > self.INSERT_USER_QUEUE_INDEXER = "INSERT INTO " + > self.USER_QUEUE_INDEXER + "{jn} (i_name, i_value, start_key, end_key) > VALUES (?, ?, ?, ?) ;" > self.INSERT_USER_DATA_IDX = "INSERT INTO " + self.USER_DATA_IDX + " > (creation_key, page_number, i_name, i_value) VALUES (?, ?, ?, ?) ;" > keyspace = cfg.identification_keyspace > self.default_consistency_level = ConsistencyLevel.ONE > global cluster, session > if not cluster or (session and session.is_shutdown): > logging.info("Connecting to cluster %s/%s" % > (cfg.identification_hosts, cfg.identification_keyspace)) > if "localhost" in cfg.identification_hosts: > load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy()) > self.default_consistency_level = ConsistencyLevel.QUORUM > logging.info("Using load_balancing_policy = > TokenAwarePolicy(RoundRobinPolicy())") > else: > self.default_consistency_level = ConsistencyLevel.ONE > load_balancing_policy = > TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1')) > logging.info("Using load_balancing_policy = > TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))") > # load_balancing_policy = DCAwareRoundRobinPolicy('DC1') > # logging.info("Using load_balancing_policy = > DCAwareRoundRobinPolicy('DC1')") > cluster = get_cluster(json.loads(cfg.identification_hosts), > port=int(cfg.identification_port), > control_connection_timeout=100000, > reconnection_policy=Cluster.reconnection_policy, > load_balancing_policy=load_balancing_policy > ) > if not session or session.is_shutdown: > logging.info("Connecting to keyspace %s/%s" % > (cfg.identification_hosts, cfg.identification_keyspace)) > session = cluster.connect(keyspace) > self.session = session > self.session.default_consistency_level = > self.default_consistency_level > self.session.default_timeout = 100000 > self.write_on_queues = write_on_queues > self.max_batch_size = max_batch_size > self.PREPARED_INSERT_USER_DATA = > self.session.prepare(self.INSERT_USER_DATA) > self.PREPARED_INSERT_USER_DATA_IDX = > self.session.prepare(self.INSERT_USER_DATA_IDX) > *Please, on this link below its possible to find the results for:* > - nodetool info > - nodetool cfstats identificationmr.user_data > - nodetool cfstats identificationmr.user_data_idx > - nodetool proxyhistograms > - nodetool cfhistograms identificationmr.user_data > - nodetool cfhistograms identificationmr.user_data_idx > Link: > https://www.dropbox.com/sh/x1xb53aomzit0ov/AAAVOOAhyDFfpA3zr8AtOCTWa?dl=0 > <https://www.google.com/url?q=https%3A%2F%2Fwww.dropbox.com%2Fsh%2Fx1xb53aomzit0ov%2FAAAVOOAhyDFfpA3zr8AtOCTWa%3Fdl%3D0&sa=D&sntz=1&usg=AFQjCNH1IlN-9rrVqZHFdcvbiyiFpfpmdA> > Please, any help or hint? > So sorry for the long/detailed email and thanks in advance. > Regards, > Gabriel.