Hi All,
I am using Cassandra-0.7.8 and trying to run "CassandraBulkLoader.java" at .
But it ended up with below error.
I have kept cassandra.yaml file on HDFS at /data/conf/cassandra.yaml .
FYR, Attached the CassandraBulkUploader.java,
So, I made below Hadoop specific changes on
DistributedCache.addCacheFile(new URI("/data/conf/cassandra.yaml",conf);
DistributedCache.releaseCache(new URI("/data/conf/cassandra.yaml"), jobconf);
Hadoop Error,
attempt_201108311244_0004_r_000000_2:
CassConfig:/data/hadoop-hadoop/mapred/local/taskTracker/archive/localhost/data/conf/cassandra.yaml
attempt_201108311244_0004_r_000000_2: Cannot locate cassandra.yaml
attempt_201108311244_0004_r_000000_2: Fatal configuration error; unable to
start server. See log for stacktrace.
11/08/31 13:03:41 INFO mapred.JobClient: map 100% reduce 33%
11/08/31 13:03:44 INFO mapred.JobClient: map 100% reduce 0%
It looks though, Cassandra.yaml is located on HDFS system was not be able to
locate it.
How can I resolve this error?
Regards,
Thamizhannal
/**
*
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
public class CassandraBulkLoader {
public static class Map extends MapReduceBase implements
Mapper<Text, Text, Text, Text> {
public void map(Text key, Text value,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
// This is a simple key/value mapper.
output.collect(key, value);
}
}
public static class Reduce extends MapReduceBase implements
Reducer<Text, Text, Text, Text> {
private Path[] localFiles;
private JobConf jobconf;
public void configure(JobConf job) {
this.jobconf = job;
String cassConfig;
// Get the cached files
try {
localFiles = DistributedCache.getLocalCacheFiles(job);
} catch (IOException e) {
throw new RuntimeException(e);
}
cassConfig = localFiles[0].getParent().toString();
System.out.println("CassConfig:"+ cassConfig);
System.setProperty("storage-config", cassConfig);
try {
StorageService.instance.initClient();
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public void close() {
try {
// release the cache
DistributedCache.releaseCache(new URI("/data/conf/cassandra.yaml"),
//System.getProperty("cassandra.config")),
this.jobconf);
} catch (IOException e) {
throw new RuntimeException(e);
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
try {
// Sleep just in case the number of keys we send over is small
Thread.sleep(3 * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
StorageService.instance.stopClient();
}
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
ColumnFamily columnFamily;
String keyspace = "Keyspace1";
String cfName = "Super1";
Message message;
List<ColumnFamily> columnFamilies;
columnFamilies = new LinkedList<ColumnFamily>();
String line;
/* Create a column family */
columnFamily = ColumnFamily.create(keyspace, cfName);
while (values.hasNext()) {
// Split the value (line based on your own delimiter)
line = values.next().toString();
String[] fields = line.split("\\s");
if (fields.length != 2) {
//return;
}
String SuperColumnName = key.toString(); // fields[1];
String ColumnName = fields[0];
String ColumnValue = fields[1];
int timestamp = 0;
columnFamily.addColumn(new QueryPath(cfName, ByteBufferUtil
.bytes(SuperColumnName), ByteBufferUtil
.bytes(ColumnName)), ByteBufferUtil.bytes(ColumnValue),
timestamp);
}
columnFamilies.add(columnFamily);
/* Get serialized message to send to cluster */
message = createMessage(keyspace, key.getBytes(), cfName,
columnFamilies);
List<IAsyncResult> results = new ArrayList<IAsyncResult>();
for (InetAddress endpoint : StorageService.instance
.getNaturalEndpoints(keyspace, ByteBufferUtil.bytes(key
.toString()))) {
/* Send message to end point */
results.add(MessagingService.instance().sendRR(message,
endpoint));
}
/* wait for acks */
for (IAsyncResult result : results) {
try {
result.get(DatabaseDescriptor.getRpcTimeout(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// you should probably add retry logic here
throw new RuntimeException(e);
}
}
output.collect(key, new Text(" inserted into Cassandra node(s)"));
}
}
public static void runJob(String[] args)
{
JobConf conf = new JobConf(CassandraBulkLoader.class);
if(args.length >= 4)
{
conf.setNumReduceTasks(new Integer(args[2]));
}
try
{
System.out.println( "SysProp"+System.getProperty("cassandra.config"));
// We store the cassandra storage-conf.xml on the HDFS cluster
DistributedCache.addCacheFile(new URI("/data/conf/cassandra.yaml"),conf); //System.getProperty("cassandra.config")), conf); // "hdfs://localhost:54310/data/conf/cassandra.yaml"),
// conf);
}
catch (URISyntaxException e)
{
throw new RuntimeException(e);
}
conf.setInputFormat(KeyValueTextInputFormat.class);
conf.setJobName("CassandraBulkLoader_v2");
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
try
{
JobClient.runJob(conf);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
public static Message createMessage(String keyspace, byte[] key,
String columnFamily, List<ColumnFamily> columnFamilies) {
ColumnFamily baseColumnFamily;
DataOutputBuffer bufOut = new DataOutputBuffer();
RowMutation rm;
Message message;
Column column;
/*
* Get the first column family from list, this is just to get past
* validation
*/
baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
DatabaseDescriptor.getComparator(keyspace, columnFamily),
DatabaseDescriptor.getSubComparator(keyspace, columnFamily),
CFMetaData.getId(keyspace, columnFamily));
for (ColumnFamily cf : columnFamilies) {
bufOut.reset();
ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
byte[] data = new byte[bufOut.getLength()];
System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());
column = new Column(FBUtilities.toByteBuffer(cf.id()), ByteBuffer
.wrap(data), 0);
baseColumnFamily.addColumn(column);
}
rm = new RowMutation(keyspace, ByteBuffer.wrap(key));
rm.add(baseColumnFamily);
try {
/* Make message */
message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
} catch (IOException e) {
throw new RuntimeException(e);
}
return message;
}
public static void main(String[] args) throws Exception {
runJob(args);
}
}