I'm a simple guy. My first step would be see if the expected data is in the data base, if not what's missing.
2.5M updates / 3 nodes = 833,333 per node 833,333 / 53 seconds = 15,723 per second 1 / 15,723 = 0.00006 seconds / 0.06 milliseconds per mutation sounds reasonable to me. check the Write Latency in nodetool cfstats and the row count estimates. Cheers ----------------- Aaron Morton Freelance Cassandra Developer @aaronmorton http://www.thelastpickle.com On 11 Aug 2011, at 14:50, Jian Fang wrote: > There are data and each Cassandra cluster node holds about 100G. From the > application point of view, if I run the job twice with the same input file, > i.e., the sales rank update file, then I should see a much smaller number of > products, whose rank change exceeds the threshold, in the output file for the > second run because the sales ranks have been updated to be the same as the > ranks in the input file during the first run. But actually, I saw the output > files stay the same for the two runs. One explanation is that the ranks were > not actually updated for the first run. Also, 53 seconds to run the whole > hadoop job with 2.5 million Cassandra updates on three nodes, do you think > that is possible? Each node is a regular Linux box with 8 CPUs. > > Thanks, > > John > > On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com> wrote: >> Seems the data are not actually written to Cassandra. > > Before jumping into the Hadoop side of things are you saying there is no data > in Cassandra ? Can you retrieve any using the CLI ? Take a look at cfstats > on each node to see the estimated record count. > > Cheers > > ----------------- > Aaron Morton > Freelance Cassandra Developer > @aaronmorton > http://www.thelastpickle.com > > On 11 Aug 2011, at 08:20, Jian Fang wrote: > >> Hi, >> >> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file >> and then write about 2.5 million records >> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My >> Cassandra cluster has three nodes with >> one Hadoop task tracker on each node. The wired problem is that I only saw >> one map and one reducer tasks and job only took >> 53 seconds to finish. Seems the data are not actually written to Cassandra. >> >> Here is status from Hadoop web admin: >> >> User: hadoop >> Job Name: SalesRankWriter >> Job File: >> hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml >> Job Setup: Successful >> Status: Succeeded >> Started at: Wed Aug 10 15:24:43 EDT 2011 >> Finished at: Wed Aug 10 15:25:36 EDT 2011 >> Finished in: 52sec >> Job Cleanup: Successful >> Kind % Complete Num Tasks Pending Running Complete Killed >> Failed/Killed >> Task Attempts >> map 100.00% >> 1 0 0 1 0 0 / 0 >> reduce 100.00% >> 1 0 0 1 0 0 / 0 >> >> Counter Map Reduce Total >> Job Counters Launched reduce tasks 0 0 1 >> Launched map tasks 0 0 1 >> Data-local map tasks 0 0 1 >> FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 >> 101,397,346 >> HDFS_BYTES_READ 56,149,360 0 56,149,360 >> FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024 >> Map-Reduce Framework Reduce input groups 0 2,534,932 >> 2,534,932 >> Combine output records 0 0 0 >> Map input records 2,534,932 0 2,534,932 >> Reduce shuffle bytes 0 0 0 >> Reduce output records 0 2,534,932 2,534,932 >> Spilled Records 5,069,864 2,534,932 7,604,796 >> Map output bytes 45,628,776 0 45,628,776 >> Map output records 2,534,932 0 2,534,932 >> Combine input records 0 0 0 >> Reduce input records 0 2,534,932 2,534,932 >> >> and the log for the mapper >> >> 2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: >> Initializing JVM Metrics with processName=MAP, sessionId= >> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb = >> 100 >> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data buffer = >> 79691776/99614720 >> 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: record buffer >> = 262144/327680 >> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: bufstart = 0; >> bufend = 4718592; bufvoid = 99614720 >> 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: kvstart = 0; >> kvend = 262144; length = 327680 >> 2011-08-10 15:24:50,364 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 0 >> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 4718592; bufend = 9437166; bufvoid = 99614720 >> 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 262144; kvend = 196607; length = 327680 >> 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 1 >> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 9437166; bufend = 14155740; bufvoid = 99614720 >> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 196607; kvend = 131070; length = 327680 >> 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 2 >> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 14155740; bufend = 18874314; bufvoid = 99614720 >> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 131070; kvend = 65533; length = 327680 >> 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 3 >> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 18874314; bufend = 23592906; bufvoid = 99614720 >> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 65533; kvend = 327677; length = 327680 >> 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 4 >> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 23592906; bufend = 28311480; bufvoid = 99614720 >> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 327677; kvend = 262140; length = 327680 >> 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 5 >> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 28311480; bufend = 33030054; bufvoid = 99614720 >> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 262140; kvend = 196603; length = 327680 >> 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 6 >> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 33030054; bufend = 37748628; bufvoid = 99614720 >> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 196603; kvend = 131066; length = 327680 >> 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 7 >> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: Spilling map >> output: record full = true >> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart = >> 37748628; bufend = 42467202; bufvoid = 99614720 >> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: kvstart = >> 131066; kvend = 65529; length = 327680 >> 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 8 >> 2011-08-10 15:24:57,051 INFO org.apache.hadoop.mapred.MapTask: Starting >> flush of map output >> 2011-08-10 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished >> spill 9 >> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10 >> sorted segments >> 2011-08-10 15:24:57,297 INFO org.apache.hadoop.mapred.Merger: Down to the >> last merge-pass, with 10 segments left of total size: 50698660 bytes >> 2011-08-10 15:24:59,552 INFO org.apache.hadoop.mapred.TaskRunner: >> Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of >> commiting >> 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner: Task >> 'attempt_201108051329_0060_m_000000_0' done. >> >> and the log for the reducer >> >> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics: >> Initializing JVM Metrics with processName=SHUFFLE, sessionId= >> 2011-08-10 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: >> ShuffleRamManager: MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 >> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging >> on-disk files >> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging in >> memory files >> 2011-08-10 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging >> on-disk files >> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Need another 1 map output(s) where 0 is >> already in progress >> 2011-08-10 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map >> Completion Events >> 2011-08-10 15:25:01,037 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Scheduled 0 outputs (0 slow hosts and0 >> dup hosts) >> 2011-08-10 15:25:01,038 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs >> 2011-08-10 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0 >> dup hosts) >> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: header: >> attempt_201108051329_0060_m_000000_0, compressed len: 50698646, decompressed >> len: 50698642 >> 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask: Shuffling >> 50698642 bytes (50698646 raw bytes) into Local-FS from >> attempt_201108051329_0060_m_000000_0 >> 2011-08-10 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read >> 50698646 bytes from map-output for attempt_201108051329_0060_m_000000_0 >> 2011-08-10 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask: >> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging >> on-disk files >> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: >> GetMapEventsThread exiting >> 2011-08-10 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: >> getMapsEventsThread joined. >> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: Closed ram >> manager >> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: >> Interleaved on-disk merge complete: 1 files left. >> 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask: In-memory >> merge complete: 0 files left. >> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1 >> files, 50698646 bytes from disk >> 2011-08-10 15:25:07,062 INFO org.apache.hadoop.mapred.ReduceTask: Merging 0 >> segments, 0 bytes from memory into reduce >> 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger: Merging 1 >> sorted segments >> 2011-08-10 15:25:07,072 INFO org.apache.hadoop.mapred.Merger: Down to the >> last merge-pass, with 1 segments left of total size: 50698642 bytes >> 2011-08-10 15:25:30,126 INFO org.apache.hadoop.mapred.TaskRunner: >> Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of >> commiting >> 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner: Task >> 'attempt_201108051329_0060_r_000000_0' done. >> >> My code is similar to the word count example: >> >> public int run(String[] args) throws Exception { >> ... >> >> getConf().set(CONF_COLUMN_NAME, columnName); >> >> Job job4 = new Job(getConf(), "SalesRankWriter"); >> job4.setJarByClass(SalesRankLoader.class); >> job4.setMapperClass(RankUpdateMapper.class); >> job4.setReducerClass(RankUpdateReducer.class); >> job4.setMapOutputKeyClass(Text.class); >> job4.setMapOutputValueClass(IntWritable.class); >> job4.setOutputKeyClass(ByteBuffer.class); >> job4.setOutputValueClass(List.class); >> job4.setOutputFormatClass(ColumnFamilyOutputFormat.class); >> job4.setInputFormatClass(TextInputFormat.class); >> FileInputFormat.addInputPath(job4, new Path(prePath)); >> >> ConfigHelper.setOutputColumnFamily(job4.getConfiguration(), >> KEYSPACE, columnFamily); >> ConfigHelper.setRpcPort(job4.getConfiguration(), "9260"); >> ConfigHelper.setInitialAddress(job4.getConfiguration(), >> "dnjsrcha01"); >> ConfigHelper.setPartitioner(job4.getConfiguration(), >> "org.apache.cassandra.dht.RandomPartitioner"); >> >> >> job4.waitForCompletion(true); >> ... >> } >> >> >> >> where the mapper and reducer are defined as: >> >> public static class RankUpdateMapper extends Mapper<LongWritable, Text, >> Text, IntWritable> { >> public void map(LongWritable key, Text value, Context context) >> throws IOException, InterruptedException { >> String line = value.toString(); >> StringTokenizer tokenizer = new StringTokenizer(line); >> String ean = tokenizer.nextToken(); >> int rank = Integer.parseInt(tokenizer.nextToken()); >> >> context.write(new Text(ean), new IntWritable(rank)); >> } >> } >> >> public static class RankUpdateReducer extends Reducer<Text, IntWritable, >> ByteBuffer, List<Mutation>> >> { >> private ByteBuffer outputKey; >> >> protected void setup(org.apache.hadoop.mapreduce.Reducer.Context >> context) throws IOException, InterruptedException >> { >> outputKey = >> ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); >> } >> >> public void reduce(Text key, Iterable<IntWritable> values, Context >> context) throws IOException, InterruptedException >> { >> context.write(outputKey, >> Collections.singletonList(getMutation(key, values.iterator().next().get()))); >> } >> >> private static Mutation getMutation(Text key, int value) >> { >> Column c = new Column(); >> c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), >> key.getLength()))); >> c.setValue(ByteBufferUtil.bytes(String.valueOf(value))); >> c.setTimestamp(System.currentTimeMillis() * 1000); >> >> Mutation m = new Mutation(); >> m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); >> m.column_or_supercolumn.setColumn(c); >> return m; >> } >> } >> >> Any thing wrong here? >> >> Thanks, >> >> John > >