...and you are sure your server does not hold a (name, value) hotspot?

Just an idea,

Jens


—
Sent from Mailbox

On Tue, Nov 11, 2014 at 9:38 PM, Gabriel Menegatti
<gabr...@s1mbi0se.com.br> wrote:

> Hello,
> We are facing a "difficult" problem when trying to use the TokenAware
> policy when performing writes, as our write request are always unbalanced
> due some unknown reason. Maybe we have something wrong on our data model?!?
> We are trying to use Token Aware to send the writes, because as far as we
> understand, the objective of the Token Aware policy is to send the write
> request directly to the node that owns the data according to the
> token/routing_key, so the request don't need to be received by a
> coordinator and then re-routed to the right node. Is our understanding
> right?
> We have a cluster with 2 DCs, where DC1 has 30 servers and DC2 10 servers
> (but same storage capacity of DC1). We are using the DSE Community 2.1.1
> and the DataStax Python Driver 2.1.2.
> Our configurations are:
> - Murmur3Partitioner
> - vnodes (num_tokens: 256)
> - NetworkTopologyStrategy
> The key space is called "identificationmr" and the replication
> factor/strategy used is 1 for DC1 and 2 for DC2.
> We are performing all the writes using consistency level ONE (not
> LOCAL_ONE), using the DC1 nodes as seeds.
> If we take a look (image below) on the data distribution, everything seems
> to be working well, everything is balanced, see image below:
> [image: Inline image 1]
> *So the problem is:*
> We tried writing data using many load balancing policy to see what is going
> on, but so far, no conclusions and all the write requests remains
> unbalanced.
> 1) When we try to write data to the column family "user_data" or
> "user_data_idx", using the token aware policy, for some reason one specific
> node receives most of the write requests (maybe because its somehow acting
> as coordinator, even if it should not), see image below:
> [image: Inline image 3]
> 2) If we try to make the writes using DC Aware Round Robin policy,
> everything is "equally distributed", but few minutes latter the
> cluster/nodes starts to give timeout.. probably because all the nodes are
> acting as coordinators and routing all the write requests to other nodes
> all the time. Since our replication factor is 1 and we have 30 servers, for
> every 100 write requests we send to a node, around 3 of them remains on the
> node that received the request and the rest need to be re-routed.
> *Our schema:*
> *- identificationmr.user_data:*
> CREATE TABLE identificationmr.user_data (
>     i_name text,
>     i_value text,
>     type int,
>     creation_key bigint,
>     data text,
>     identifiers text,
>     on_boarded boolean,
>     origin text,
>     prefix text,
>     sub_prefix text,
>     PRIMARY KEY ((i_name, i_value), type, creation_key, data)
> ) WITH CLUSTERING ORDER BY (type ASC, creation_key ASC, data ASC)
>     AND bloom_filter_fp_chance = 0.01
>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>     AND comment = ''
>     AND compaction = {'min_threshold': '4', 'class':
> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
> 'max_threshold': '32'}
>     AND compression = {'sstable_compression':
> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>     AND dclocal_read_repair_chance = 0.1
>     AND default_time_to_live = 0
>     AND gc_grace_seconds = 864000
>     AND max_index_interval = 2048
>     AND memtable_flush_period_in_ms = 0
>     AND min_index_interval = 128
>     AND read_repair_chance = 0.0
>     AND speculative_retry = '99.0PERCENTILE';
> *- identificationmr.user_data_idx:*
> CREATE TABLE identificationmr.user_data_idx (
>     creation_key bigint,
>     page_number int,
>     i_name text,
>     i_value text,
>     PRIMARY KEY ((creation_key, page_number), i_name, i_value)
> ) WITH CLUSTERING ORDER BY (i_name ASC, i_value ASC)
>     AND bloom_filter_fp_chance = 0.01
>     AND caching = '{"keys":"ALL", "rows_per_partition":"NONE"}'
>     AND comment = ''
>     AND compaction = {'min_threshold': '4', 'class':
> 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
> 'max_threshold': '32'}
>     AND compression = {'sstable_compression':
> 'org.apache.cassandra.io.compress.LZ4Compressor'}
>     AND dclocal_read_repair_chance = 0.1
>     AND default_time_to_live = 0
>     AND gc_grace_seconds = 864000
>     AND max_index_interval = 2048
>     AND memtable_flush_period_in_ms = 0
>     AND min_index_interval = 128
>     AND read_repair_chance = 0.0
>     AND speculative_retry = '99.0PERCENTILE';
> *Python code used to perform the write:*
>         self.INSERT_USER_DATA = "INSERT INTO " + self.USER_DATA + "
> (i_name, i_value, creation_key, type, data, prefix, sub_prefix, origin,
> identifiers, on_boarded) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ;"
>         self.INSERT_USER_QUEUE_REDUCER = "INSERT INTO " +
> self.USER_QUEUE_REDUCER + "{jn} (i_name, i_value, start_key, end_key)
> VALUES (?, ?, ?, ?) ;"
>         self.INSERT_USER_QUEUE_INDEXER = "INSERT INTO " +
> self.USER_QUEUE_INDEXER + "{jn} (i_name, i_value, start_key, end_key)
> VALUES (?, ?, ?, ?) ;"
>         self.INSERT_USER_DATA_IDX = "INSERT INTO " + self.USER_DATA_IDX + "
> (creation_key, page_number, i_name, i_value) VALUES (?, ?, ?, ?) ;"
>         keyspace = cfg.identification_keyspace
>         self.default_consistency_level = ConsistencyLevel.ONE
>         global cluster, session
>         if not cluster or (session and session.is_shutdown):
>             logging.info("Connecting to cluster %s/%s" %
> (cfg.identification_hosts, cfg.identification_keyspace))
>             if "localhost" in cfg.identification_hosts:
>                 load_balancing_policy = TokenAwarePolicy(RoundRobinPolicy())
>                 self.default_consistency_level = ConsistencyLevel.QUORUM
>                 logging.info("Using load_balancing_policy =
> TokenAwarePolicy(RoundRobinPolicy())")
>             else:
>                 self.default_consistency_level = ConsistencyLevel.ONE
>                 load_balancing_policy =
> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))
>                 logging.info("Using load_balancing_policy =
> TokenAwarePolicy(DCAwareRoundRobinPolicy('DC1'))")
>                 # load_balancing_policy = DCAwareRoundRobinPolicy('DC1')
>                 # logging.info("Using load_balancing_policy =
> DCAwareRoundRobinPolicy('DC1')")
>             cluster = get_cluster(json.loads(cfg.identification_hosts),
>                               port=int(cfg.identification_port),
>                               control_connection_timeout=100000,
> reconnection_policy=Cluster.reconnection_policy,
>                               load_balancing_policy=load_balancing_policy
>             )
>         if not session or session.is_shutdown:
>             logging.info("Connecting to keyspace %s/%s" %
> (cfg.identification_hosts, cfg.identification_keyspace))
>             session = cluster.connect(keyspace)
>         self.session = session
>         self.session.default_consistency_level =
> self.default_consistency_level
>         self.session.default_timeout = 100000
>         self.write_on_queues = write_on_queues
>         self.max_batch_size = max_batch_size
>         self.PREPARED_INSERT_USER_DATA =
> self.session.prepare(self.INSERT_USER_DATA)
>         self.PREPARED_INSERT_USER_DATA_IDX =
> self.session.prepare(self.INSERT_USER_DATA_IDX)
> *Please, on this link below its possible to find the results for:*
> - nodetool info
> - nodetool cfstats identificationmr.user_data
> - nodetool cfstats identificationmr.user_data_idx
> - nodetool proxyhistograms
> - nodetool cfhistograms identificationmr.user_data
> - nodetool cfhistograms identificationmr.user_data_idx
> Link:
> https://www.dropbox.com/sh/x1xb53aomzit0ov/AAAVOOAhyDFfpA3zr8AtOCTWa?dl=0
> <https://www.google.com/url?q=https%3A%2F%2Fwww.dropbox.com%2Fsh%2Fx1xb53aomzit0ov%2FAAAVOOAhyDFfpA3zr8AtOCTWa%3Fdl%3D0&sa=D&sntz=1&usg=AFQjCNH1IlN-9rrVqZHFdcvbiyiFpfpmdA>
> Please, any help or hint?
> So sorry for the long/detailed email and thanks in advance.
> Regards,
> Gabriel.

Reply via email to