53 seconds included the map phase to read and process the input file. The
records were updated at the end of the reduce phase.

I checked the sales ranks in the update file and the sales ranks in the
Cassandra, they are different and thus, the records
were not actually updated.

I remember I run the word count example for Cassandra 0.8.0 some time ago, I
saw the similar behavior, i.e., the results were not written to the
Cassandra column family. But If I changed hadoop to write to the file
system, I can see the results in the output file. I didn't try the word
count example for Cassandra 0.8.2 though.

Anyway to solve this problem?

Thanks,

John
On Thu, Aug 11, 2011 at 12:17 AM, aaron morton <aa...@thelastpickle.com>wrote:

> I'm a simple guy. My first step would be see if the expected data is in the
> data base, if not what's missing.
>
> 2.5M updates / 3 nodes = 833,333 per node
> 833,333 / 53 seconds = 15,723 per second
> 1 / 15,723  = 0.00006 seconds / 0.06 milliseconds per mutation
>
> sounds reasonable to me.
>
> check the Write Latency in nodetool cfstats and the row count estimates.
>
> Cheers
>
> -----------------
> Aaron Morton
> Freelance Cassandra Developer
> @aaronmorton
> http://www.thelastpickle.com
>
> On 11 Aug 2011, at 14:50, Jian Fang wrote:
>
> There are data and each Cassandra cluster node holds about 100G. From the
> application point of view, if I run the job twice with the same input file,
> i.e., the sales rank update file, then I should see a much smaller number of
> products, whose rank change exceeds the threshold, in the output file for
> the second run because the sales ranks have been updated to be the same as
> the ranks in the input file during the first run. But actually, I saw the
> output files stay the same for the two runs. One explanation is that the
> ranks were not actually updated for the first run. Also, 53 seconds to run
> the whole hadoop job with 2.5 million Cassandra updates on three nodes, do
> you think that is possible? Each node is a regular Linux box with 8 CPUs.
>
> Thanks,
>
> John
>
> On Wed, Aug 10, 2011 at 5:40 PM, aaron morton <aa...@thelastpickle.com>wrote:
>
>>  Seems the data are not actually written to Cassandra.
>>
>>
>> Before jumping into the Hadoop side of things are you saying there is no
>> data in Cassandra ? Can you retrieve any using the CLI  ? Take a look at
>> cfstats on each node to see the estimated record count.
>>
>> Cheers
>>
>>  -----------------
>> Aaron Morton
>> Freelance Cassandra Developer
>> @aaronmorton
>> http://www.thelastpickle.com
>>
>> On 11 Aug 2011, at 08:20, Jian Fang wrote:
>>
>> Hi,
>>
>> I am using Cassandra 0.8.2 with Hadoop 0.20.2. My application read a file
>> and then write about 2.5 million records
>> to Cassandra. I used ColumnFamilyOutputFormat to write to Cassandra. My
>> Cassandra cluster has three nodes with
>> one Hadoop task tracker on each node. The wired problem is that I only saw
>> one map and one reducer tasks and job only took
>> 53 seconds to finish. Seems the data are not actually written to
>> Cassandra.
>>
>> Here is status from Hadoop web admin:
>>
>> User: hadoop
>> Job Name: SalesRankWriter
>> Job File:
>> hdfs://xxxxx:54310/hadoop/tmp/mapred/system/job_201108051329_0060/job.xml
>> Job Setup: Successful
>> Status: Succeeded
>> Started at: Wed Aug 10 15:24:43 EDT 2011
>> Finished at: Wed Aug 10 15:25:36 EDT 2011
>> Finished in: 52sec
>> Job Cleanup: Successful
>> Kind % Complete Num Tasks Pending Running Complete Killed Failed/Killed
>> Task Attempts
>> map 100.00%
>> 1 0 0 1 0 0 / 0
>> reduce 100.00%
>> 1 0 0 1 0 0 / 0
>>
>> Counter Map Reduce Total
>> Job Counters Launched reduce tasks 0 0 1
>> Launched map tasks 0 0 1
>> Data-local map tasks 0 0 1
>> FileSystemCounters FILE_BYTES_READ 50,698,700 50,698,646 101,397,346
>> HDFS_BYTES_READ 56,149,360 0 56,149,360
>> FILE_BYTES_WRITTEN 101,397,378 50,698,646 152,096,024
>> Map-Reduce Framework Reduce input groups 0 2,534,932 2,534,932
>> Combine output records 0 0 0
>> Map input records 2,534,932 0 2,534,932
>> Reduce shuffle bytes 0 0 0
>> Reduce output records 0 2,534,932 2,534,932
>> Spilled Records 5,069,864 2,534,932 7,604,796
>> Map output bytes 45,628,776 0 45,628,776
>> Map output records 2,534,932 0 2,534,932
>> Combine input records 0 0 0
>> Reduce input records 0 2,534,932 2,534,932
>>
>> and the log for the mapper
>>
>>  2011-08-10 15:24:48,717 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
>> Initializing JVM Metrics with processName=MAP, sessionId=
>> 2011-08-10 15:24:48,857 INFO org.apache.hadoop.mapred.MapTask: io.sort.mb
>> = 100 2011-08-10 15:24:48,917 INFO org.apache.hadoop.mapred.MapTask: data
>> buffer = 79691776/99614720 2011-08-10 15:24:48,917 INFO
>> org.apache.hadoop.mapred.MapTask: record buffer = 262144/327680 2011-08-10
>> 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask: Spilling map output:
>> record full = true 2011-08-10 15:24:49,760 INFO
>> org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 4718592; bufvoid =
>> 99614720 2011-08-10 15:24:49,760 INFO org.apache.hadoop.mapred.MapTask:
>> kvstart = 0; kvend = 262144; length = 327680 2011-08-10 15:24:50,364 INFO
>> org.apache.hadoop.mapred.MapTask: Finished spill 0 2011-08-10 15:24:50,707
>> INFO org.apache.hadoop.mapred.MapTask: Spilling map output: record full =
>> true 2011-08-10 15:24:50,707 INFO org.apache.hadoop.mapred.MapTask: bufstart
>> = 4718592; bufend = 9437166; bufvoid = 99614720 2011-08-10 15:24:50,707 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 262144; kvend = 196607; length =
>> 327680 2011-08-10 15:24:51,238 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 1 2011-08-10 15:24:51,583 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:51,583 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 9437166; bufend = 14155740; bufvoid = 99614720 2011-08-10 15:24:51,583 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 196607; kvend = 131070; length =
>> 327680 2011-08-10 15:24:52,084 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 2 2011-08-10 15:24:52,433 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:52,433 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 14155740; bufend = 18874314; bufvoid = 99614720 2011-08-10 15:24:52,433 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 131070; kvend = 65533; length =
>> 327680 2011-08-10 15:24:52,877 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 3 2011-08-10 15:24:53,216 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:53,216 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 18874314; bufend = 23592906; bufvoid = 99614720 2011-08-10 15:24:53,216 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 65533; kvend = 327677; length =
>> 327680 2011-08-10 15:24:53,660 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 4 2011-08-10 15:24:54,010 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:54,010 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 23592906; bufend = 28311480; bufvoid = 99614720 2011-08-10 15:24:54,010 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 327677; kvend = 262140; length =
>> 327680 2011-08-10 15:24:54,447 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 5 2011-08-10 15:24:54,793 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:54,793 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 28311480; bufend = 33030054; bufvoid = 99614720 2011-08-10 15:24:54,793 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 262140; kvend = 196603; length =
>> 327680 2011-08-10 15:24:55,237 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 6 2011-08-10 15:24:55,564 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:55,564 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 33030054; bufend = 37748628; bufvoid = 99614720 2011-08-10 15:24:55,564 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 196603; kvend = 131066; length =
>> 327680 2011-08-10 15:24:55,981 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 7 2011-08-10 15:24:56,434 INFO
>> org.apache.hadoop.mapred.MapTask: Spilling map output: record full = true
>> 2011-08-10 15:24:56,434 INFO org.apache.hadoop.mapred.MapTask: bufstart =
>> 37748628; bufend = 42467202; bufvoid = 99614720 2011-08-10 15:24:56,434 INFO
>> org.apache.hadoop.mapred.MapTask: kvstart = 131066; kvend = 65529; length =
>> 327680 2011-08-10 15:24:56,848 INFO org.apache.hadoop.mapred.MapTask:
>> Finished spill 8 2011-08-10 15:24:57,051 INFO
>> org.apache.hadoop.mapred.MapTask: Starting flush of map output 2011-08-10
>> 15:24:57,282 INFO org.apache.hadoop.mapred.MapTask: Finished spill 9
>> 2011-08-10 15:24:57,291 INFO org.apache.hadoop.mapred.Merger: Merging 10
>> sorted segments 2011-08-10 15:24:57,297 INFO
>> org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 10
>> segments left of total size: 50698660 bytes 2011-08-10 15:24:59,552 INFO
>> org.apache.hadoop.mapred.TaskRunner:
>> Task:attempt_201108051329_0060_m_000000_0 is done. And is in the process of
>> commiting 2011-08-10 15:24:59,555 INFO org.apache.hadoop.mapred.TaskRunner:
>> Task 'attempt_201108051329_0060_m_000000_0' done.
>>
>> and the log for the reducer
>>
>> 2011-08-10 15:25:00,835 INFO org.apache.hadoop.metrics.jvm.JvmMetrics:
>> Initializing JVM Metrics with processName=SHUFFLE, sessionId= 2011-08-10
>> 15:25:01,005 INFO org.apache.hadoop.mapred.ReduceTask: ShuffleRamManager:
>> MemoryLimit=140699232, MaxSingleShuffleLimit=35174808 2011-08-10
>> 15:25:01,022 INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for merging
>> on-disk files 2011-08-10 15:25:01,022 INFO
>> org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
>> Thread started: Thread for merging in memory files 2011-08-10 15:25:01,022
>> INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging
>> on-disk files 2011-08-10 15:25:01,024 INFO
>> org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
>> Need another 1 map output(s) where 0 is already in progress 2011-08-10
>> 15:25:01,024 INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0 Thread started: Thread for polling Map
>> Completion Events 2011-08-10 15:25:01,037 INFO
>> org.apache.hadoop.mapred.ReduceTask: attempt_201108051329_0060_r_000000_0
>> Scheduled 0 outputs (0 slow hosts and0 dup hosts) 2011-08-10 15:25:01,038
>> INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0: Got 1 new map-outputs 2011-08-10
>> 15:25:06,039 INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0 Scheduled 1 outputs (0 slow hosts and0
>> dup hosts) 2011-08-10 15:25:06,121 INFO org.apache.hadoop.mapred.ReduceTask:
>> header: attempt_201108051329_0060_m_000000_0, compressed len: 50698646,
>> decompressed len: 50698642 2011-08-10 15:25:06,121 INFO
>> org.apache.hadoop.mapred.ReduceTask: Shuffling 50698642 bytes (50698646 raw
>> bytes) into Local-FS from attempt_201108051329_0060_m_000000_0 2011-08-10
>> 15:25:06,314 INFO org.apache.hadoop.mapred.ReduceTask: Read 50698646 bytes
>> from map-output for attempt_201108051329_0060_m_000000_0 2011-08-10
>> 15:25:06,315 INFO org.apache.hadoop.mapred.ReduceTask:
>> attempt_201108051329_0060_r_000000_0 Thread waiting: Thread for merging
>> on-disk files 2011-08-10 15:25:07,055 INFO
>> org.apache.hadoop.mapred.ReduceTask: GetMapEventsThread exiting 2011-08-10
>> 15:25:07,055 INFO org.apache.hadoop.mapred.ReduceTask: getMapsEventsThread
>> joined. 2011-08-10 15:25:07,056 INFO org.apache.hadoop.mapred.ReduceTask:
>> Closed ram manager 2011-08-10 15:25:07,056 INFO
>> org.apache.hadoop.mapred.ReduceTask: Interleaved on-disk merge complete: 1
>> files left. 2011-08-10 15:25:07,056 INFO
>> org.apache.hadoop.mapred.ReduceTask: In-memory merge complete: 0 files left.
>> 2011-08-10 15:25:07,061 INFO org.apache.hadoop.mapred.ReduceTask: Merging 1
>> files, 50698646 bytes from disk 2011-08-10 15:25:07,062 INFO
>> org.apache.hadoop.mapred.ReduceTask: Merging 0 segments, 0 bytes from memory
>> into reduce 2011-08-10 15:25:07,065 INFO org.apache.hadoop.mapred.Merger:
>> Merging 1 sorted segments 2011-08-10 15:25:07,072 INFO
>> org.apache.hadoop.mapred.Merger: Down to the last merge-pass, with 1
>> segments left of total size: 50698642 bytes 2011-08-10 15:25:30,126 INFO
>> org.apache.hadoop.mapred.TaskRunner:
>> Task:attempt_201108051329_0060_r_000000_0 is done. And is in the process of
>> commiting 2011-08-10 15:25:30,129 INFO org.apache.hadoop.mapred.TaskRunner:
>> Task 'attempt_201108051329_0060_r_000000_0' done.
>>
>> My code is similar to the word count example:
>>
>>     public int run(String[] args) throws Exception {
>>  ...
>>         getConf().set(CONF_COLUMN_NAME, columnName);
>>
>>         Job job4 = new Job(getConf(), "SalesRankWriter");
>>         job4.setJarByClass(SalesRankLoader.class);
>>         job4.setMapperClass(RankUpdateMapper.class);
>>         job4.setReducerClass(RankUpdateReducer.class);
>>         job4.setMapOutputKeyClass(Text.class);
>>         job4.setMapOutputValueClass(IntWritable.class);
>>         job4.setOutputKeyClass(ByteBuffer.class);
>>         job4.setOutputValueClass(List.class);
>>         job4.setOutputFormatClass(ColumnFamilyOutputFormat.class);
>>         job4.setInputFormatClass(TextInputFormat.class);
>>         FileInputFormat.addInputPath(job4, new Path(prePath));
>>
>>         ConfigHelper.setOutputColumnFamily(job4.getConfiguration(),
>> KEYSPACE, columnFamily);
>>         ConfigHelper.setRpcPort(job4.getConfiguration(), "9260");
>>         ConfigHelper.setInitialAddress(job4.getConfiguration(),
>> "dnjsrcha01");
>>         ConfigHelper.setPartitioner(job4.getConfiguration(),
>> "org.apache.cassandra.dht.RandomPartitioner");
>>
>>         job4.waitForCompletion(true);
>> ...
>> }
>>
>> where the mapper and reducer are defined as:
>>
>> public static class RankUpdateMapper extends Mapper<LongWritable, Text,
>> Text, IntWritable> { public void map(LongWritable key, Text value, Context
>> context) throws IOException, InterruptedException { String line =
>> value.toString(); StringTokenizer tokenizer = new StringTokenizer(line);
>> String ean = tokenizer.nextToken(); int rank =
>> Integer.parseInt(tokenizer.nextToken()); context.write(new Text(ean), new
>> IntWritable(rank)); } } public static class RankUpdateReducer extends
>> Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> { private ByteBuffer
>> outputKey; protected void setup(org.apache.hadoop.mapreduce.Reducer.Context
>> context) throws IOException, InterruptedException { outputKey =
>> ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); }
>> public void reduce(Text key, Iterable<IntWritable> values, Context context)
>> throws IOException, InterruptedException { context.write(outputKey,
>> Collections.singletonList(getMutation(key,
>> values.iterator().next().get()))); } private static Mutation
>> getMutation(Text key, int value) { Column c = new Column();
>> c.setName(ByteBuffer.wrap(Arrays.copyOf(key.getBytes(), key.getLength())));
>> c.setValue(ByteBufferUtil.bytes(String.valueOf(value)));
>> c.setTimestamp(System.currentTimeMillis() * 1000); Mutation m = new
>> Mutation(); m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
>> m.column_or_supercolumn.setColumn(c); return m; } }
>>
>> Any thing wrong here?
>>
>> Thanks,
>>
>> John
>>
>>
>>
>
>

Reply via email to