I'm a simple guy. My first step would be see if the expected data is in the 
data base, if not what's missing. 

2.5M updates / 3 nodes = 833,333 per node 
833,333 / 53 seconds = 15,723 per second 
1 / 15,723  = 0.00006 seconds / 0.06 milliseconds per mutation 
 
sounds reasonable to me. 

check the Write Latency in nodetool cfstats and the row count estimates. 

Cheers

-----------------
Aaron Morton
Freelance Cassandra Developer
@aaronmorton
http://www.thelastpickle.com

On 11 Aug 2011, at 14:50, Jian Fang wrote:

> There are data and each Cassandra cluster node holds about 100G. From the 
> application point of view, if I run the job twice with the same input file, 
> i.e., the sales rank update file, then I should see a much smaller number of 
> products, whose rank change exceeds the threshold, in the output file for the 
> second run because the sales ranks have been updated to be the same as the 
> ranks in the input file during the first run. But actually, I saw the output 
> files stay the same for the two runs. One explanation is that the ranks were 
> not actually updated for the first run. Also, 53 seconds to run the whole 
> hadoop job with 2.5 million Cassandra updates on three nodes, do you think 
> that is possible? Each node is a regular Linux box with 8 CPUs. 
> 
> Thanks,
> 
> John  
> 
> On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com> wrote:
>>  Seems the data are not actually written to Cassandra.
> 
> Before jumping into the Hadoop side of things are you saying there is no data 
> in Cassandra ? Can you retrieve any using the CLI  ? Take a look at cfstats 
> on each node to see the estimated record count.
> 
> Cheers
>  
> -----------------
> Aaron Morton
> Freelance Cassandra Developer
> @aaronmorton
> http://www.thelastpickle.com
> 
> On 11 Aug 2011, at 08:20, Jian Fang wrote:
> 
>> Hi,
>> 
>> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file 
>> and then write about 2.5 million records
>> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My 
>> Cassandra cluster has three nodes with
>> one Hadoop task tracker on each node. The wired problem is that I only saw 
>> one map and one reducer tasks and job only took
>> 53 seconds to finish. Seems the data are not actually written to Cassandra.
>> 
>> Here is status from Hadoop web admin:
>> 
>> User: hadoop
>> Job Name: SalesRankWriter
>> Job File: 
>> hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
>> Job Setup: Successful
>> Status: Succeeded
>> Started at: Wed Aug 10 15:24:43 EDT 2011
>> Finished at: Wed Aug 10 15:25:36 EDT 2011
>> Finished in: 52sec
>> Job Cleanup: Successful
>> Kind % Complete      Num Tasks       Pending Running Complete        Killed  
>> Failed/Killed
>> Task Attempts
>> map  100.00%
>> 1    0       0       1       0       0 / 0
>> reduce       100.00%
>> 1    0       0       1       0       0 / 0
>> 
>> Counter      Map     Reduce  Total
>> Job Counters Launched reduce tasks   0       0       1
>> Launched map tasks   0       0       1
>> Data-local map tasks 0       0       1
>> FileSystemCounters   FILE_BYTES_READ 50,698,700      50,698,646      
>> 101,397,346
>> HDFS_BYTES_READ      56,149,360      0       56,149,360
>> FILE_BYTES_WRITTEN   101,397,378     50,698,646      152,096,024
>> Map-Reduce Framework Reduce input groups     0       2,534,932       
>> 2,534,932
>> Combine output records       0       0       0
>> Map input records    2,534,932       0       2,534,932
>> Reduce shuffle bytes 0       0       0
>> Reduce output records        0       2,534,932       2,534,932
>> Spilled Records      5,069,864       2,534,932       7,604,796
>> Map output bytes     45,628,776      0       45,628,776
>> Map output records   2,534,932       0       2,534,932
>> Combine input records        0       0       0
>> Reduce input records 0       2,534,932       2,534,932
>> 
>> and the log for the mapper
>> 
>>  2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: 
>> Initializing JVM Metrics with processName=MAP, sessionId=
>> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = 
>> 100
>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer = 
>> 79691776/99614720
>> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer 
>> = 262144/327680
>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; 
>> bufend = 4718592; bufvoid = 99614720
>> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; 
>> kvend = 262144; length = 327680
>> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 0
>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 4718592; bufend = 9437166; bufvoid = 99614720
>> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 262144; kvend = 196607; length = 327680
>> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 1
>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 9437166; bufend = 14155740; bufvoid = 99614720
>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 196607; kvend = 131070; length = 327680
>> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 2
>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 14155740; bufend = 18874314; bufvoid = 99614720
>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 131070; kvend = 65533; length = 327680
>> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 3
>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 18874314; bufend = 23592906; bufvoid = 99614720
>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 65533; kvend = 327677; length = 327680
>> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 4
>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 23592906; bufend = 28311480; bufvoid = 99614720
>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 327677; kvend = 262140; length = 327680
>> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 5
>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 28311480; bufend = 33030054; bufvoid = 99614720
>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 262140; kvend = 196603; length = 327680
>> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 6
>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 33030054; bufend = 37748628; bufvoid = 99614720
>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 196603; kvend = 131066; length = 327680
>> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 7
>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map 
>> output: record full = true
>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = 
>> 37748628; bufend = 42467202; bufvoid = 99614720
>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = 
>> 131066; kvend = 65529; length = 327680
>> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 8
>> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting 
>> flush of map output
>> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished 
>> spill 9
>> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 
>> sorted segments
>> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the 
>> last merge-pass, with 10 segments left of total size: 50698660 bytes
>> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: 
>> Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of 
>> commiting
>> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task 
>> 'attempt_201108051329_0060_m_000000_0' done.
>> 
>> and the log for the reducer
>> 
>> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: 
>> Initializing JVM Metrics with processName=SHUFFLE, sessionId=
>> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: 
>> ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808
>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging 
>> on-disk files
>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in 
>> memory files
>> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging 
>> on-disk files
>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 is 
>> already in progress
>> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map 
>> Completion Events
>> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 
>> dup hosts)
>> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs
>> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 
>> dup hosts)
>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: 
>> attempt_201108051329_0060_m_000000_0, compressed len: 50698646, decompressed 
>> len: 50698642
>> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling 
>> 50698642 bytes (50698646 raw bytes) into Local-FS from 
>> attempt_201108051329_0060_m_000000_0
>> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 
>> 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0
>> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: 
>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging 
>> on-disk files
>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: 
>> GetMapEventsThread exiting
>> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: 
>> getMapsEventsThread joined.
>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram 
>> manager
>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: 
>> Interleaved on-disk merge complete: 1 files left.
>> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory 
>> merge complete: 0 files left.
>> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 
>> files, 50698646 bytes from disk
>> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 
>> segments, 0 bytes from memory into reduce
>> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 
>> sorted segments
>> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the 
>> last merge-pass, with 1 segments left of total size: 50698642 bytes
>> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: 
>> Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of 
>> commiting
>> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task 
>> 'attempt_201108051329_0060_r_000000_0' done.
>> 
>> My code is similar to the word count example:
>> 
>>     public int run(String[] args) throws Exception {
>>         ...
>> 
>>         getConf().set(CONF_COLUMN_NAME, columnName);
>> 
>>         Job job4 = new Job(getConf(), "SalesRankWriter");
>>         job4.setJarByClass(SalesRankLoader.class);
>>         job4.setMapperClass(RankUpdateMapper.class);
>>         job4.setReducerClass(RankUpdateReducer.class);
>>         job4.setMapOutputKeyClass(Text.class);
>>         job4.setMapOutputValueClass(IntWritable.class);
>>         job4.setOutputKeyClass(ByteBuffer.class);
>>         job4.setOutputValueClass(List.class);
>>         job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
>>         job4.setInputFormatClass(TextInputFormat.class);
>>         FileInputFormat.addInputPath(job4, new Path(prePath));
>> 
>>         ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), 
>> KEYSPACE, columnFamily);
>>         ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
>>         ConfigHelper.setInitialAddress(job4.getConfiguration(), 
>> "dnjsrcha01");
>>         ConfigHelper.setPartitioner(job4.getConfiguration(), 
>> "org.apache.cassandra.dht.RandomPartitioner");
>> 
>> 
>>         job4.waitForCompletion(true);
>>         ...
>>     }
>> 
>> 
>> 
>> where the mapper and reducer are defined as:
>> 
>>     public static class RankUpdateMapper extends Mapper<LongWritable, Text, 
>> Text, IntWritable> {
>>         public void map(LongWritable key, Text value, Context context) 
>> throws IOException, InterruptedException {
>>             String line = value.toString();
>>             StringTokenizer tokenizer = new StringTokenizer(line);
>>             String ean = tokenizer.nextToken();
>>             int rank = Integer.parseInt(tokenizer.nextToken());
>> 
>>             context.write(new Text(ean), new IntWritable(rank));
>>         }
>>     }
>> 
>>     public static class RankUpdateReducer extends Reducer<Text, IntWritable, 
>> ByteBuffer, List<Mutation>>
>>     {
>>         private ByteBuffer outputKey;
>> 
>>         protected void setup(org.apache.hadoop.mapreduce.Reducer.Context 
>> context) throws IOException, InterruptedException
>>         {
>>             outputKey = 
>> ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
>>         }
>> 
>>         public void reduce(Text key, Iterable<IntWritable> values, Context 
>> context) throws IOException, InterruptedException
>>         {
>>             context.write(outputKey, 
>> Collections.singletonList(getMutation(key, values.iterator().next().get())));
>>         }
>> 
>>         private static Mutation getMutation(Text key, int value)
>>         {
>>             Column c = new Column();
>>             c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), 
>> key.getLength())));
>>             c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
>>             c.setTimestamp(System.currentTimeMillis() * 1000);
>> 
>>             Mutation m = new Mutation();
>>             m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
>>             m.column_or_supercolumn.setColumn(c);
>>             return m;
>>         }
>>     }
>> 
>> Any thing wrong here?
>> 
>> Thanks,
>> 
>> John
> 
> 

Reply via email to