I use PHP, and phpCassa to talk to cassandra from within my app. I'm using the below script's structure as a way to run a local mutation on each of my nodes:
======= <?php require_once('PATH/TO/phpcassa-1.0.a.6/lib/autoload.php'); use phpcassa\ColumnFamily; use phpcassa\Connection\ConnectionPool; use phpcassa\SystemManager; try { //i'm sure there is a cleaner way of doing this, but for me I can't connect as localhost, I need to use the AWS private IP $ip = exec("/sbin/ifconfig | grep 'inet addr:'| grep -v '127.0.0.1' | cut -d: -f2 | awk '{ print $1}'") $localCassandra = "$ip:9160"; $keyspace = ""; $cf = ""; $systemManager = new SystemManager($localCassandra); $ring = $systemManager->describe_ring($keyspace); $startToken = null; $endToken = null; foreach ($ring as $ringDetails) { //There is an endpoint per RF, with the 1st == "owner" foreach ($ringDetails->endpoints as $endpoint) { if ($endpoint == $ip) { $startToken = $ringDetails->start_token; $endToken = $ringDetails->end_token; } break; } if ($startToken != null && $endToken != null) { break; } } if ($startToken == null || $endToken == null) { fwrite(STDERR, "My ip[$ip] not in ring = " . print_r($ring, true)); exit(1); } $pool = new ConnectionPool($keyspace, array($localCassandra)); $column_family = new ColumnFamily($pool, $cf); //I patched my local phpCassa to support null == iterate over all rows. // if my pull request is accepted it will be in the git repo. otherwise put // a large int as arg 3, something hopefully larger than all rows on a node... foreach ($column_family->get_range_by_token($startToken, $endToken, null) as $key => $columns) { foreach ($columns as $cn => $cv) { //I track information here } //and do an optional mutation here based on the row } } catch (Exception $e) { fwrite(STDERR, $e); } ============ On Fri, Apr 4, 2014 at 1:40 PM, William Oberman <ober...@civicscience.com>wrote: > Looking at the code, cassandra.input.split.size==Pig URL split_size, > right? But, in cassandra 1.2.15 I'm wondering if there is a bug that would > make the hadoop conf setting cassandra.input.split.size not be used > unless you manually set the URI to splitSize=0 (because the abstract class > defaults the splitSize to 64k instead of 0)? Long story short though, > I've messed with that setting in the direction you suggested (decreasing), > and I'm confident hadoop/pig was picking it up (I eventually decreased it > too far, which caused an server side error of too much memory used). > > I'm stuck in a "rock & a hard place" on the mappers. At 20 tasks, based > on the delete rate before time out failures happen, it was going to take > 1-2 days to run the deletes (I was seeing ~10k deletes/sec across all 20 > task threads). But, this is going to be be my plan at this point: less > tasks at once, even if it takes a week (of hopefully unsupervised time). > > Thanks for the feedback! > > > > > > > On Fri, Apr 4, 2014 at 12:57 PM, Paulo Ricardo Motta Gomes < > paulo.mo...@chaordicsystems.com> wrote: > >> You said you have tried the Pig URL split_size, but have you actually >> tried decreasing the value of cassandra.input.split.size hadoop property? >> The default is 65536, so you may want to decrease that to see if the number >> of mappers increase. But at some point, even if you lower that value it >> will stop decreasing the number of mappers but I don't know exactly why, >> probably because it hits the minimum number of rows per token. >> >> Another suggestion is to decrease the number of simultaneous mappers of >> your job, so it doesn't hit cassandra too hard, and you'll get less >> TimedOutExceptions, but your job will take longer to complete. >> >> On Fri, Apr 4, 2014 at 1:24 PM, William Oberman <ober...@civicscience.com >> > wrote: >> >>> Hi, >>> >>> I have some history with cassandra + hadoop: >>> 1.) Single DC + integrated hadoop = Was "ok" until I needed steady >>> performance (the single DC was used in a production environment) >>> 2.) Two DC's + integrated hadoop on 1 of 2 DCs = Was "ok" until my data >>> grew and in AWS compute is expensive compared to data storage... e.g. >>> running a 24x7 DC was a lot more expensive than the following solution... >>> 3.) Single DC + a constant "ETL" to S3 = Is still ok, I can spawn an >>> "arbitrarily large" EMR cluster. And 24x7 data storage + transient EMR is >>> cost effective. >>> >>> But, one of my CF's has had a change of usage pattern making a large %, >>> but not all of the data, fairly pointless to store. I thought I'd write a >>> Pig UDF that could peek at a row of data and delete if it fails my >>> criteria. And it "works" in terms of logic, but not in terms of practical >>> execution. The CF in question has O(billion) keys, and afterwards it will >>> have ~10% of that at most. >>> >>> I basically keep losing the jobs due to too many task failures, all >>> rooted in: >>> Caused by: TimedOutException() >>> at >>> org.apache.cassandra.thrift.Cassandra$get_range_slices_result.read(Cassandra.java:13020) >>> >>> And yes, I've messed around with: >>> -Number of failures for map/reduce/tracker (in the hadoop confs) >>> -split_size (on the URL) >>> -cassandra.range.batch.size >>> >>> But it hasn't helped. My failsafe is to roll my own distributed >>> process, rather than falling into a pit of internal hadoop settings. But I >>> feel like I'm close. >>> >>> The problem in my opinion, watching how things are going, is the >>> correlation of splits <-> tasks. I'm obviously using Pig, so this part of >>> the process is fairly opaque to me at the moment. But, "something >>> somewhere" is picking 20 tasks for my job, and this is fairly independent >>> of the # of task slots (I've booted EMR cluster with different #'s and >>> always get 20). Why does this matter? When a task fails, it retries from >>> the start, which is a killer for me as I "delete as I go", making that >>> pointless work and massively increasing the odds of an overall job failure. >>> If hadoop/pig chose a large number of tasks, the retries would be much >>> less of a burden. But, I don't see where/what lets me mess with that logic. >>> >>> Pig gives the ability to mess with reducers (PARALLEL), but I'm in the >>> load path, which is all mappers. I've never jumped to the lower, raw >>> hadoop level before. But, I'm worried that will be the "falling into a >>> pit" issue... >>> >>> I'm using Cassandra 1.2.15. >>> >>> will >>> >> >> >> >> -- >> *Paulo Motta* >> >> Chaordic | *Platform* >> *www.chaordic.com.br <http://www.chaordic.com.br/>* >> +55 48 3232.3200 >> +55 83 9690-1314 >> > > >