Ok ...
I'm sending both.
Versions:
Apache Pig version 0.8.0 (r1043805)
compiled Dec 08 2010, 17:26:09
Hadoop 0.20.2
On Tue, Mar 1, 2011 at 6:44 PM, Daniel Dai <[email protected]> wrote:
> Combine input splits should be able to handle compressed files. It will
> create seperate RecordReader for each file within one input split. So gzip
> concatenation should not be the case. I am not sure what happen to your
> script. If possible, give us more information (script, UDF, data, version).
>
> Daniel
>
>
>
> On 02/28/2011 05:40 PM, Charles Gonçalves wrote:
>
> Guys,
>
> The amount of data in the source dir:
> hdfs://hydra1:57810/user/cdh-hadoop/mscdata/201010_raw 22567369111
>
> What I did was:
> I run with all logs, 43458 and the counters are:
>
> FILE_BYTES_READ 253,905,706 372,708,857 626,614,563 HDFS_BYTES_READ
> 2,553,123,734 0 2,553,123,734 FILE_BYTES_WRITTEN 619,877,917 372,708,857
> 992,586,774 HDFS_BYTES_WRITTEN 0 535 535
>
>
> I did a manual join of the files and run again for the 336 files (the
> merge of all those files).
> The job didn't finished yet and the counters are:
>
> FILE_BYTES_READ 21,054,970,818 0 21,054,970,818 HDFS_BYTES_READ
> 16,772,063,486 0 16,772,063,486 FILE_BYTES_WRITTEN 39,797,038,008
> 10,404,287,551 50,201,325,55
>
>
> I think that the problem could be in the combination of the input files.
> Is the combination class aware of compression.
> Because *all my files are compressed*.
> Maybe the class perform a concatenation and we fall in the hdfs limitation
> of gzip concatenated files.
>
> On Mon, Feb 28, 2011 at 8:47 PM, Charles Gonçalves
> <[email protected]>wrote:
>
>>
>>
>> On Mon, Feb 28, 2011 at 7:39 PM, Thejas M Nair <[email protected]>wrote:
>>
>>> Hi Charles,
>>> Which load function are you using ?
>>>
>> I'm using a UD load function ..
>>
>> Is the default (PigStorage?).
>>>
>> Nops ...
>>
>>
>>> In the hadoop counters for the job in the jobtracker ui, do you see the
>>> expected number of input records being read?
>>>
>> Is possible to see the counter in the history interface on JobTracker?
>> I will run the jobs again to compare the counter, but my guess is probably
>> not!
>>
>> -Thejas
>>>
>>>
>>>
>>>
>>> On 2/28/11 10:57 AM, "Charles Gonçalves" <[email protected]> wrote:
>>>
>>> I'm not using any filtering in the script.
>>> I'm just want to see the total traffic per day in all logs.
>>>
>>> If I combine 1000 log files into one and run the script on this log
>>> files I
>>> got the correct answer for those logs.
>>> But when I'm run with all the *43458* log files I got a incorrect
>>> output.
>>> The correct would be an histogram for each day from 2010-10 but the
>>> result
>>> contain only data from 2010-10-21.
>>> And if I process all the logs with an awk script I got the correct
>>> answer.
>>>
>>>
>>> On Mon, Feb 28, 2011 at 3:29 PM, Daniel Dai <[email protected]>
>>> wrote:
>>>
>>> > Not sure if I get your question. In 0.8, Pig combine small files into
>>> one
>>> > map, so it is possible you get less output files.
>>>
>>> This is not the problem.
>>> But thanks anyway!
>>>
>>> If that is your concern, you can try to disable split combine using
>>> > "-Dpig.splitCombination=false"
>>> >
>>> > Daniel
>>> >
>>> >
>>> > Charles Gonçalves wrote:
>>> >
>>> >> I tried to process a big number of small files on pig and I got a
>>> strange
>>> >> problem.
>>> >>
>>> >> 2011-02-27 00:00:58,746 [Thread-15] INFO
>>> >> org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input
>>> paths
>>> >> to process : *43458*
>>> >> 2011-02-27 00:00:58,755 [Thread-15] INFO
>>> >> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total
>>> >> input
>>> >> paths to process : *43458*
>>> >> 2011-02-27 00:01:14,173 [Thread-15] INFO
>>> >> org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total
>>> >> input
>>> >> paths (combined) to process : *329*
>>> >>
>>> >> When the script finish to process, the result is just about a subgroup
>>> of
>>> >> the input files.
>>> >> These are logs from a whole month, but the results are just from the
>>> day
>>> >> 21.
>>> >>
>>> >>
>>> >> Maybe I'm missing something.
>>> >> Any Ideas?
>>> >>
>>> >>
>>> >>
>>> >
>>> >
>>>
>>>
>>> --
>>> *Charles Ferreira Gonçalves *
>>> http://homepages.dcc.ufmg.br/~charles/
>>> UFMG - ICEx - Dcc
>>> Cel.: 55 31 87741485
>>> Tel.: 55 31 34741485
>>> Lab.: 55 31 34095840
>>>
>>>
>>>
>>
>>
>> --
>> *Charles Ferreira Gonçalves *
>> http://homepages.dcc.ufmg.br/~charles/
>> UFMG - ICEx - Dcc
>> Cel.: 55 31 87741485
>> Tel.: 55 31 34741485
>> Lab.: 55 31 34095840
>>
>
>
>
> --
> *Charles Ferreira Gonçalves *
> http://homepages.dcc.ufmg.br/~charles/
> UFMG - ICEx - Dcc
> Cel.: 55 31 87741485
> Tel.: 55 31 34741485
> Lab.: 55 31 34095840
>
>
>
--
*Charles Ferreira Gonçalves *
http://homepages.dcc.ufmg.br/~charles/
UFMG - ICEx - Dcc
Cel.: 55 31 87741485
Tel.: 55 31 34741485
Lab.: 55 31 34095840
package msc.pig;
import java.io.IOException;
import java.util.List;
import msc.misc.NumberUtils;
import msc.parser.FieldLineSplitter;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.log4j.Logger;
import org.apache.pig.Expression;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceStatistics;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
import org.apache.pig.bzip2r.Bzip2TextInputFormat;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
public class EdgeLoader extends LoadFunc implements LoadMetadata {
private static final Logger logger = Logger.getLogger(EdgeLoader.class);
private static final String SEP = " +";
private static final String ENC = "\"";
private static final String ESC = "\"";
private static final String COM = "(?s)#.*";
private static final int TUPLE_SIZE = 17;
private static FieldLineSplitter splitter;
static {
splitter = new FieldLineSplitter(SEP, ENC, ESC, COM);
}
private final TupleFactory tupleFactory = TupleFactory.getInstance();
private RecordReader reader;
private String loadLocation;
public EdgeLoader() {
}
@Override
public InputFormat getInputFormat() throws IOException {
if(loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz")) {
return new Bzip2TextInputFormat();
} else {
return new PigTextInputFormat();
}
}
@Override
public Tuple getNext() throws IOException {
// Get an valid line or return null if there is no more lines
List<String> fields = getValidFields();
if (fields == null) {
// End of input
return null;
}
// Two fields will be discarded to save space (empty - xCust)
Tuple tuple = tupleFactory.newTuple(fields.size() - 2);
// s.add(new FieldSchema("ts", DataType.LONG));
tuple.set(0, NumberUtils.getLong(fields.get(0)));
// s.add(new FieldSchema("timeTaken", DataType.INTEGER));
tuple.set(1, NumberUtils.getInteger(fields.get(1)));
// s.add(new FieldSchema("cIp", DataType.CHARARRAY));
tuple.set(2, fields.get(2));
// s.add(new FieldSchema("fSize", DataType.LONG));
tuple.set(3, NumberUtils.getLong(fields.get(3)));
// s.add(new FieldSchema("sIp", DataType.CHARARRAY));
tuple.set(4, fields.get(4));
// s.add(new FieldSchema("sPort", DataType.INTEGER));
tuple.set(5, NumberUtils.getInteger(fields.get(5)));
// s.add(new FieldSchema("scStatus", DataType.CHARARRAY));
tuple.set(6, fields.get(6));
// s.add(new FieldSchema("scBytes", DataType.LONG));
tuple.set(7, NumberUtils.getLong(fields.get(7)));
// s.add(new FieldSchema("csMethod", DataType.CHARARRAY));
tuple.set(8, fields.get(8));
// s.add(new FieldSchema("url", DataType.CHARARRAY));
tuple.set(9, fields.get(9));
// s.add(new FieldSchema("empty", DataType.BYTEARRAY));
//Discarded
// s.add(new FieldSchema("rsDuration", DataType.INTEGER));
tuple.set(10, NumberUtils.getInteger(fields.get(11)));
// s.add(new FieldSchema("rsBytes", DataType.INTEGER));
tuple.set(11, NumberUtils.getInteger(fields.get(12)));
// s.add(new FieldSchema("referrer", DataType.CHARARRAY));
tuple.set(12, fields.get(13));
// s.add(new FieldSchema("ua", DataType.CHARARRAY));
tuple.set(13, fields.get(14));
// s.add(new FieldSchema("edgeId", DataType.CHARARRAY));
tuple.set(14, fields.get(15));
// s.add(new FieldSchema("xCust", DataType.BYTEARRAY));
//Discarded !!!
return tuple;
}
private List<String> getValidFields() throws IOException {
List<String> line_fields = null;
boolean isValid = false;
do {
Text value = null;
try {
if (!reader.nextKeyValue()) {
return null;
}
value = (Text) reader.getCurrentValue();
} catch (InterruptedException e) {
throw new ExecException(e);
}
String line = value.toString();
if (line.startsWith("#")) {
logger.debug("Ignoring Comment line");
logger.trace("Comment Line : '" + line + "'");
} else {
try {
line_fields = splitter.parseLine(line);
if (line_fields.size() == TUPLE_SIZE) {
isValid = true;
} else {
logger.debug("Ignoring line with " + line_fields.size() + " fields!");
logger.trace("Line with incorrect fields number: '" + line + "'");
}
} catch (Exception e) {
logger.error("Error when parsing line: ", e);
logger.debug("Error Line: '" + line + "'");
}
}
} while (!isValid);
return line_fields;
}
@Override
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
this.reader = reader;
}
@Override
public void setLocation(String location, Job job) throws IOException {
loadLocation = location;
System.err.println("location:\t" + location);
FileInputFormat.setInputPaths(job, location);
}
@Override
public String[] getPartitionKeys(String arg0, Job arg1) throws IOException {
return null;
}
@Override
public ResourceSchema getSchema(String location, Job job)
throws IOException {
Schema s = new Schema();
s.add(new FieldSchema("ts", DataType.LONG));
s.add(new FieldSchema("timeTaken", DataType.INTEGER));
s.add(new FieldSchema("cIp", DataType.CHARARRAY));
s.add(new FieldSchema("fSize", DataType.LONG));
s.add(new FieldSchema("sIp", DataType.CHARARRAY));
s.add(new FieldSchema("sPort", DataType.INTEGER));
s.add(new FieldSchema("scStatus", DataType.CHARARRAY));
s.add(new FieldSchema("scBytes", DataType.LONG));
s.add(new FieldSchema("csMethod", DataType.CHARARRAY));
s.add(new FieldSchema("url", DataType.CHARARRAY));
s.add(new FieldSchema("rsDuration", DataType.INTEGER));
s.add(new FieldSchema("rsBytes", DataType.INTEGER));
s.add(new FieldSchema("referrer", DataType.CHARARRAY));
s.add(new FieldSchema("ua", DataType.CHARARRAY));
s.add(new FieldSchema("edgeId", DataType.CHARARRAY));
return new ResourceSchema(s);
}
@Override
public ResourceStatistics getStatistics(String arg0, Job arg1)
throws IOException {
return null;
}
@Override
public void setPartitionFilter(Expression arg0) throws IOException {
}
}