Hi All,

I am using Cassandra-0.7.8 and trying to run "CassandraBulkLoader.java" at . 
But it ended up with below error.
I have kept cassandra.yaml file on HDFS at /data/conf/cassandra.yaml .

FYR, Attached the  CassandraBulkUploader.java,

So, I made below Hadoop specific changes on

DistributedCache.addCacheFile(new URI("/data/conf/cassandra.yaml",conf); 
DistributedCache.releaseCache(new URI("/data/conf/cassandra.yaml"), jobconf);

Hadoop Error,
attempt_201108311244_0004_r_000000_2: 
CassConfig:/data/hadoop-hadoop/mapred/local/taskTracker/archive/localhost/data/conf/cassandra.yaml
attempt_201108311244_0004_r_000000_2: Cannot locate cassandra.yaml
attempt_201108311244_0004_r_000000_2: Fatal configuration error; unable to 
start server.  See log for stacktrace.
11/08/31 13:03:41 INFO mapred.JobClient:  map 100% reduce 33%
11/08/31 13:03:44 INFO mapred.JobClient:  map 100% reduce 0%

It looks though, Cassandra.yaml is located on HDFS system was not be able to 
locate it.
How can I resolve this error?

Regards,
Thamizhannal
/**
 * 
 */

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class CassandraBulkLoader {
	
	public static class Map extends MapReduceBase implements
			Mapper<Text, Text, Text, Text> {
		public void map(Text key, Text value,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			// This is a simple key/value mapper.
			output.collect(key, value);
		}
	}

	public static class Reduce extends MapReduceBase implements
			Reducer<Text, Text, Text, Text> {
		private Path[] localFiles;
		private JobConf jobconf;

		public void configure(JobConf job) {
			this.jobconf = job;
			String cassConfig;

			// Get the cached files
			try {
				localFiles = DistributedCache.getLocalCacheFiles(job);
			} catch (IOException e) {
				throw new RuntimeException(e);
			}
			cassConfig = localFiles[0].getParent().toString();
			System.out.println("CassConfig:"+ cassConfig);
			System.setProperty("storage-config", cassConfig);

			try {
				StorageService.instance.initClient();
			} catch (Exception e) {
				throw new RuntimeException(e);
			}
			try {
				Thread.sleep(10 * 1000);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
		}

		public void close() {
			try {
				// release the cache
				DistributedCache.releaseCache(new URI("/data/conf/cassandra.yaml"),
						//System.getProperty("cassandra.config")),
						this.jobconf);
			} catch (IOException e) {
				throw new RuntimeException(e);
			} catch (URISyntaxException e) {
				throw new RuntimeException(e);
			}
			try {
				// Sleep just in case the number of keys we send over is small
				Thread.sleep(3 * 1000);
			} catch (InterruptedException e) {
				throw new RuntimeException(e);
			}
			StorageService.instance.stopClient();
		}

		public void reduce(Text key, Iterator<Text> values,
				OutputCollector<Text, Text> output, Reporter reporter)
				throws IOException {
			ColumnFamily columnFamily;
			String keyspace = "Keyspace1";
			String cfName = "Super1";
			Message message;
			List<ColumnFamily> columnFamilies;
			columnFamilies = new LinkedList<ColumnFamily>();
			String line;

			/* Create a column family */
			columnFamily = ColumnFamily.create(keyspace, cfName);
			while (values.hasNext()) {
				// Split the value (line based on your own delimiter)
				line = values.next().toString();

				String[] fields = line.split("\\s");
				if (fields.length != 2) {
					//return;
				}
				String SuperColumnName = key.toString(); // fields[1];
				String ColumnName = fields[0];
				String ColumnValue = fields[1];

				int timestamp = 0;
				columnFamily.addColumn(new QueryPath(cfName, ByteBufferUtil
						.bytes(SuperColumnName), ByteBufferUtil
						.bytes(ColumnName)), ByteBufferUtil.bytes(ColumnValue),
						timestamp);
			}

			columnFamilies.add(columnFamily);

			/* Get serialized message to send to cluster */
			message = createMessage(keyspace, key.getBytes(), cfName,
					columnFamilies);
			List<IAsyncResult> results = new ArrayList<IAsyncResult>();

			for (InetAddress endpoint : StorageService.instance
					.getNaturalEndpoints(keyspace, ByteBufferUtil.bytes(key
							.toString()))) {
				/* Send message to end point */
				results.add(MessagingService.instance().sendRR(message,
						endpoint));
			}
			/* wait for acks */
			for (IAsyncResult result : results) {
				try {
					result.get(DatabaseDescriptor.getRpcTimeout(),
							TimeUnit.MILLISECONDS);
				} catch (TimeoutException e) {
					// you should probably add retry logic here
					throw new RuntimeException(e);
				}
			}

			output.collect(key, new Text(" inserted into Cassandra node(s)"));
		}
	}

	public static void runJob(String[] args)
    {
        JobConf conf = new JobConf(CassandraBulkLoader.class);

        if(args.length >= 4)
        {
          conf.setNumReduceTasks(new Integer(args[2]));
        }

        try
        {
        	System.out.println( "SysProp"+System.getProperty("cassandra.config"));
            // We store the cassandra storage-conf.xml on the HDFS cluster
            DistributedCache.addCacheFile(new URI("/data/conf/cassandra.yaml"),conf); //System.getProperty("cassandra.config")), conf); // "hdfs://localhost:54310/data/conf/cassandra.yaml"),
																								// conf);
        }
        catch (URISyntaxException e)
        {
            throw new RuntimeException(e);
        }
        conf.setInputFormat(KeyValueTextInputFormat.class);
        conf.setJobName("CassandraBulkLoader_v2");
        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        try
        {
            JobClient.runJob(conf);
        }
        catch (IOException e)
        {
            throw new RuntimeException(e);
        }
    }

	public static Message createMessage(String keyspace, byte[] key,
			String columnFamily, List<ColumnFamily> columnFamilies) {
		ColumnFamily baseColumnFamily;
		DataOutputBuffer bufOut = new DataOutputBuffer();
		RowMutation rm;
		Message message;
		Column column;

		/*
		 * Get the first column family from list, this is just to get past
		 * validation
		 */
		baseColumnFamily = new ColumnFamily(ColumnFamilyType.Standard,
				DatabaseDescriptor.getComparator(keyspace, columnFamily),
				DatabaseDescriptor.getSubComparator(keyspace, columnFamily),
				CFMetaData.getId(keyspace, columnFamily));

		for (ColumnFamily cf : columnFamilies) {
			bufOut.reset();
			ColumnFamily.serializer().serializeWithIndexes(cf, bufOut);
			byte[] data = new byte[bufOut.getLength()];
			System.arraycopy(bufOut.getData(), 0, data, 0, bufOut.getLength());

			column = new Column(FBUtilities.toByteBuffer(cf.id()), ByteBuffer
					.wrap(data), 0);
			baseColumnFamily.addColumn(column);
		}
		rm = new RowMutation(keyspace, ByteBuffer.wrap(key));
		rm.add(baseColumnFamily);

		try {
			/* Make message */
			message = rm.makeRowMutationMessage(StorageService.Verb.BINARY);
		} catch (IOException e) {
			throw new RuntimeException(e);
		}

		return message;
	}

	public static void main(String[] args) throws Exception {
		runJob(args);
	}
}

Reply via email to