[
https://issues.apache.org/jira/browse/PIG-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Daniel Dai updated PIG-2462:
----------------------------
Attachment: (was: PIG-2462-2.patch)
> getWrappedSplit is incorrectly returning the first split instead of the
> current split.
> --------------------------------------------------------------------------------------
>
> Key: PIG-2462
> URL: https://issues.apache.org/jira/browse/PIG-2462
> Project: Pig
> Issue Type: Bug
> Affects Versions: 0.9.1, 0.11
> Reporter: Alex Rovner
> Fix For: 0.11
>
> Attachments: PIG-2462-2_0.9.patch, split_fix_take2.patch,
> splitsfix.patch
>
>
> If your loader needs information regarding what file is currently is being
> read (lets say for schema information), currently provides this ability by
> calling prepareToRead every time we read a new split. This is critical for
> ComibinedInputFormat as each mapper can read more then one file. In order for
> the load function to know what file we are currently reading, it should call
> getWrappedSplit() to get that information. How ever, getWrappedSplit always
> returns the first split in the list. Code from PigSplit.java:
> /**
> * This methods returns the actual InputSplit (as returned by the
> * {@link InputFormat}) which this class is wrapping.
> * @return the wrappedSplit
> */
> public InputSplit getWrappedSplit() {
> return wrappedSplits[0];
> }
> Furthermore, in PigRecordReader.java the splitIndex is never incremented when
> changing from split to split. So in fact, even if getWrappedSplit() wold be
> changed to return wrappedSplits[splitIndex]; it would still return the
> incorrect index.
> This can be fixed by changing PigRecordReader to increment
> PigSplit.splitIndex everytime the split chagnes in the following code:
> /**
> * Get the record reader for the next chunk in this CombineFileSplit.
> */
> protected boolean initNextRecordReader() throws IOException,
> InterruptedException {
> if (curReader != null) {
> curReader.close();
> curReader = null;
> if (idx > 0) {
> progress += pigSplit.getLength(idx-1); // done processing
> so far
> }
> }
> // if all chunks have been processed, nothing more to do.
> if (idx == pigSplit.getNumPaths()) {
> return false;
> }
> // get a record reader for the idx-th chunk
> try {
>
> curReader =
> inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
> LOG.info("Current split being processed
> "+pigSplit.getWrappedSplit(idx));
> if (idx > 0) {
> // initialize() for the first RecordReader will be called by
> MapTask;
> // we're responsible for initializing subsequent
> RecordReaders.
> curReader.initialize(pigSplit.getWrappedSplit(idx), context);
> pigSplit.get
> loadfunc.prepareToRead(curReader, pigSplit);
> }
> } catch (Exception e) {
> throw new RuntimeException (e);
> }
> idx++;
> return true;
> }
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira