Hmmm - I have only tested in local mode but I got an java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) Here are two classes - one will work one will not the mgf file is what they read
showPairRDD simply print the text read
guaranteeSparkMaster calls sparkConf.setMaster("local"); if there is no
master defined
Perhaps I need to convert Text somewhere else but I certainly don't see
where
package com.lordjoe.distributed.input;
/**
* com.lordjoe.distributed.input.MGFInputFormat
* User: Steve
* Date: 9/24/2014
*/
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;
import java.io.*;
/**
* org.systemsbiology.hadoop.MGFInputFormat
* Splitter that reads mgf files
* nice enough to put the begin and end tags on separate lines
*/
public class MGFInputFormat extends FileInputFormat<StringBuffer, StringBuffer> implements Serializable {
private String m_Extension = "mgf";
public MGFInputFormat() {
}
@SuppressWarnings("UnusedDeclaration")
public String getExtension() {
return m_Extension;
}
@SuppressWarnings("UnusedDeclaration")
public void setExtension(final String pExtension) {
m_Extension = pExtension;
}
@Override
public RecordReader<StringBuffer, StringBuffer> createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new MGFFileReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final String lcName = file.getName().toLowerCase();
//noinspection RedundantIfStatement
if (lcName.endsWith("gz"))
return false;
return true;
}
/**
* Custom RecordReader which returns the entire file as a
* single value with the name as a key
* Value is the entire file
* Key is the file name
*/
public class MGFFileReader extends RecordReader<StringBuffer, StringBuffer> implements Serializable {
private CompressionCodecFactory compressionCodecs = null;
private long m_Start;
private long m_End;
private long current;
private LineReader m_Input;
FSDataInputStream m_RealFile;
private StringBuffer key = null;
private StringBuffer value = null;
private Text buffer; // must be
public Text getBuffer() {
if(buffer == null)
buffer = new Text();
return buffer;
}
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
m_Start = split.getStart();
m_End = m_Start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
boolean skipFirstLine = false;
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the m_Start of the split
FileSystem fs = file.getFileSystem(job);
// open the file and seek to the m_Start of the split
m_RealFile = fs.open(split.getPath());
if (codec != null) {
CompressionInputStream inputStream = codec.createInputStream(m_RealFile);
m_Input = new LineReader( inputStream );
m_End = Long.MAX_VALUE;
}
else {
if (m_Start != 0) {
skipFirstLine = true;
--m_Start;
m_RealFile.seek(m_Start);
}
m_Input = new LineReader( m_RealFile);
}
// not at the beginning so go to first line
if (skipFirstLine) { // skip first line and re-establish "m_Start".
m_Start += m_Input.readLine(getBuffer()) ;
}
current = m_Start;
if (key == null) {
key = new StringBuffer();
}
else {
key.setLength(0);
}
key.append(split.getPath().getName());
if (value == null) {
value = new StringBuffer();
}
current = 0;
}
/**
* look for a <scan tag then read until it closes
*
* @return true if there is data
* @throws java.io.IOException
*/
public boolean nextKeyValue() throws IOException
{
int newSize;
while (current < m_Start) {
newSize = m_Input.readLine(buffer);
// we are done
if (newSize == 0) {
key = null;
value = null;
return false;
}
current = m_RealFile.getPos();
}
StringBuilder sb = new StringBuilder();
newSize = m_Input.readLine(getBuffer());
String str;
while (newSize > 0) {
str = buffer.toString();
if ("BEGIN IONS".equals(str)) {
break;
}
current = m_RealFile.getPos();
// we are done
if (current > m_End) {
key = null;
value = null;
return false;
}
newSize = m_Input.readLine(getBuffer());
}
if (newSize == 0) {
key = null;
value = null;
return false;
}
while (newSize > 0) {
str = buffer.toString();
sb.append(str);
sb.append("\n");
if ("END IONS".equals(str)) {
break;
}
newSize = m_Input.readLine(buffer);
}
String s = sb.toString();
value = new StringBuffer();
value.append(s);
if (sb.length() == 0) {
key = null;
value = null;
return false;
}
else {
return true;
}
}
@Override
public StringBuffer getCurrentKey() {
return key;
}
@Override
public StringBuffer getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
long totalBytes = m_End - m_Start;
long totalHandled = current - m_Start;
return ((float) totalHandled) / totalBytes;
}
public synchronized void close() throws IOException {
if (m_Input != null) {
m_Input.close();
}
}
}
}package com.lordjoe.distributed.input;
/**
* com.lordjoe.distributed.input.MGFInputFormat
* User: Steve
* Date: 9/24/2014
*/
import org.apache.hadoop.conf.*;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.*;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.util.*;
import java.io.*;
/**
* org.systemsbiology.hadoop.MGFTextInputFormat
* Splitter that reads mgf files with data type as Text
* nice enough to put the begin and end tags on separate lines
*/
public class MGFTextInputFormat extends FileInputFormat<Text, Text> implements Serializable {
private String m_Extension = "mgf";
public MGFTextInputFormat() {
}
@SuppressWarnings("UnusedDeclaration")
public String getExtension() {
return m_Extension;
}
@SuppressWarnings("UnusedDeclaration")
public void setExtension(final String pExtension) {
m_Extension = pExtension;
}
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) {
return new MGFFileReader();
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final String lcName = file.getName().toLowerCase();
//noinspection RedundantIfStatement
if (lcName.endsWith("gz"))
return false;
return true;
}
/**
* Custom RecordReader which returns the entire file as a
* single value with the name as a key
* Value is the entire file
* Key is the file name
*/
public class MGFFileReader extends RecordReader<Text, Text> implements Serializable {
private CompressionCodecFactory compressionCodecs = null;
private long m_Start;
private long m_End;
private long current;
private LineReader m_Input;
FSDataInputStream m_RealFile;
private Text key = null;
private Text value = null;
private Text buffer; // must be
public Text getBuffer() {
if (buffer == null)
buffer = new Text();
return buffer;
}
public void initialize(InputSplit genericSplit,
TaskAttemptContext context) throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
m_Start = split.getStart();
m_End = m_Start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
boolean skipFirstLine = false;
final CompressionCodec codec = compressionCodecs.getCodec(file);
// open the file and seek to the m_Start of the split
FileSystem fs = file.getFileSystem(job);
// open the file and seek to the m_Start of the split
m_RealFile = fs.open(split.getPath());
if (codec != null) {
CompressionInputStream inputStream = codec.createInputStream(m_RealFile);
m_Input = new LineReader(inputStream);
m_End = Long.MAX_VALUE;
}
else {
if (m_Start != 0) {
skipFirstLine = true;
--m_Start;
m_RealFile.seek(m_Start);
}
m_Input = new LineReader(m_RealFile);
}
// not at the beginning so go to first line
if (skipFirstLine) { // skip first line and re-establish "m_Start".
m_Start += m_Input.readLine(getBuffer());
}
current = m_Start;
if (key == null) {
key = new Text();
}
key.set(split.getPath().getName());
if (value == null) {
value = new Text();
}
current = 0;
}
/**
* look for a <scan tag then read until it closes
*
* @return true if there is data
* @throws java.io.IOException
*/
public boolean nextKeyValue() throws IOException {
int newSize;
while (current < m_Start) {
newSize = m_Input.readLine(buffer);
// we are done
if (newSize == 0) {
key = null;
value = null;
return false;
}
current = m_RealFile.getPos();
}
StringBuilder sb = new StringBuilder();
newSize = m_Input.readLine(getBuffer());
String str;
while (newSize > 0) {
str = buffer.toString();
if ("BEGIN IONS".equals(str)) {
break;
}
current = m_RealFile.getPos();
// we are done
if (current > m_End) {
key = null;
value = null;
return false;
}
newSize = m_Input.readLine(getBuffer());
}
if (newSize == 0) {
key = null;
value = null;
return false;
}
while (newSize > 0) {
str = buffer.toString();
sb.append(str);
sb.append("\n");
if ("END IONS".equals(str)) {
break;
}
newSize = m_Input.readLine(buffer);
}
String s = sb.toString();
value = new Text();
value.set(s);
if (sb.length() == 0) {
key = null;
value = null;
return false;
}
else {
return true;
}
}
@Override
public Text getCurrentKey() {
return key;
}
@Override
public Text getCurrentValue() {
return value;
}
/**
* Get the progress within the split
*/
public float getProgress() {
long totalBytes = m_End - m_Start;
long totalHandled = current - m_Start;
return ((float) totalHandled) / totalBytes;
}
public synchronized void close() throws IOException {
if (m_Input != null) {
m_Input.close();
}
}
}
}package com.lordjoe.distributed.input.spark;
import com.lordjoe.distributed.*;
import com.lordjoe.distributed.input.*;
import org.apache.spark.*;
import org.apache.spark.api.java.*;
/**
* org.systemsbiology.xtandem.spark.SparkTandemUtilitiesTests
* User: Steve
* Date: 9/22/2014
*/
public class SparkTandemUtilitiesTests {
public static class MZXMLInputFormat extends XMLTagInputFormat
{
public MZXMLInputFormat() {
super("scan");
}
}
public static JavaPairRDD<String,String> parseSpectrumFile(String path,JavaSparkContext ctx) {
if(path.toLowerCase().endsWith(".mgf"))
return parseAsTextMGF(path,ctx);
if(path.toLowerCase().endsWith(".mgf"))
return parseAsMGF(path,ctx);
if(path.toLowerCase().endsWith(".mzxml"))
return parseAsMZXML(path,ctx);
throw new UnsupportedOperationException("Cannot understand extension " + path);
}
public static JavaPairRDD<String,String> parseAsMZXML(final String path, final JavaSparkContext ctx) {
Class inputFormatClass = MZXMLInputFormat.class;
Class keyClass = String.class;
Class valueClass = String.class;
return ctx.newAPIHadoopFile(
path,
inputFormatClass,
keyClass,
valueClass,
ctx.hadoopConfiguration()
);
}
public static JavaPairRDD<String,String> parseAsOldMGF(final String path, final JavaSparkContext ctx) {
Class inputFormatClass = MGFOldInputFormat.class;
Class keyClass = String.class;
Class valueClass = String.class;
return ctx.hadoopFile(
path,
inputFormatClass,
keyClass,
valueClass
);
}
public static JavaPairRDD<String,String> parseAsTextMGF(final String path, final JavaSparkContext ctx) {
Class inputFormatClass = MGFTextInputFormat.class;
Class keyClass = String.class;
Class valueClass = String.class;
return ctx.newAPIHadoopFile(
path,
inputFormatClass,
keyClass,
valueClass,
ctx.hadoopConfiguration()
);
}
public static JavaPairRDD<String,String> parseAsMGF(final String path, final JavaSparkContext ctx) {
Class inputFormatClass = MGFInputFormat.class;
Class keyClass = String.class;
Class valueClass = String.class;
return ctx.newAPIHadoopFile(
path,
inputFormatClass,
keyClass,
valueClass,
ctx.hadoopConfiguration()
);
}
public static JavaPairRDD<String,String> parseSpectrumFileOld(String path,JavaSparkContext ctx) {
Class inputFormatClass = MGFOldInputFormat.class;
Class keyClass = String.class;
Class valueClass = String.class;
return ctx.hadoopFile(
path,
inputFormatClass,
keyClass,
valueClass
);
}
public static void main(String[] args) {
if(args.length == 0) {
System.out.println("usage <file holding mgfs>");
return;
}
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
SparkUtilities.guaranteeSparkMaster(sparkConf); // use local if no master provided
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
for (int i = 0; i < args.length; i++) {
String arg = args[i];
JavaPairRDD<String, String> parsed = parseSpectrumFile(args[i], ctx);
SparkUtilities.showPairRDD(parsed);
}
}
}
salivaryglandhealthycontrol10.mgf
Description: Binary data
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
