[ 
https://issues.apache.org/jira/browse/PIG-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16107272#comment-16107272
 ] 

Adam Szita commented on PIG-5277:
---------------------------------

So far we have mentioned 3 proposals of solutions:
1, Alter InterRecordReader so that it skips the nulls produced by Spark (like 
before..)
2, JoinGroupSpark#GroupPkgFunction returns a special type of Tuple 
(NonWritableTuple) instead of null which we catch on the writing side and skip 
it from getting pushed to the output.
3, Skip as MR does and continue with next Tuple.
4, Apply rdd().filter() in the converter and skip the nulls coming from 
GroupPkgFunction. (this actually would implement proposal#3)

The problems with these are:
1, it's not fixing the original problem
2, hacky
3, I don't see how this would be achievable as the "skipping" part would have 
to be in Spark code
4, Using a Filter is a problem because it results in other tests failing like: 
TestPruneColumn#testCoGroup1 reason:
A filter on an RDD is using filter iterator in Scala on which if we call 
hasNext() it has to evaluate next() on the wrapped iterator so that it can test 
the predicate. Because of this... inside OutputConsumerIterator the behaviour 
changes so that GroupPkgFunction is not called as a result of input.next() but 
rather due to calls of [input.hasNext(). This is unfortunately called one more 
time before evaluating 
getNextResult().|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/OutputConsumerIterator.java#L65]
This all results that some input tuples are getting shifted and a wrong result 
may be produced when for example we use Accumulative UDFs.

All-in-all proposal#4 was most promising but I can't see a way to work around 
it properly. Any ideas?


> Spark mode is writing nulls among tuples to the output 
> -------------------------------------------------------
>
>                 Key: PIG-5277
>                 URL: https://issues.apache.org/jira/browse/PIG-5277
>             Project: Pig
>          Issue Type: Bug
>          Components: spark
>            Reporter: Adam Szita
>            Assignee: Adam Szita
>
> After committing PIG-3655 a couple of Spark mode tests (e.g. 
> org.apache.pig.test.TestEvalPipeline.testCogroupAfterDistinct) started 
> failing on:
> {code}
> java.lang.Error: java.io.IOException: Corrupt data file, expected tuple type 
> byte, but seen 27
>       at 
> org.apache.pig.backend.hadoop.executionengine.HJob$1.hasNext(HJob.java:122)
>       at 
> org.apache.pig.test.TestEvalPipeline.testCogroupAfterDistinct(TestEvalPipeline.java:1052)
> Caused by: java.io.IOException: Corrupt data file, expected tuple type byte, 
> but seen 27
>       at 
> org.apache.pig.impl.io.InterRecordReader.readDataOrEOF(InterRecordReader.java:158)
>       at 
> org.apache.pig.impl.io.InterRecordReader.nextKeyValue(InterRecordReader.java:194)
>       at org.apache.pig.impl.io.InterStorage.getNext(InterStorage.java:79)
>       at 
> org.apache.pig.impl.io.ReadToEndLoader.getNextHelper(ReadToEndLoader.java:238)
>       at 
> org.apache.pig.impl.io.ReadToEndLoader.getNext(ReadToEndLoader.java:218)
>       at 
> org.apache.pig.backend.hadoop.executionengine.HJob$1.hasNext(HJob.java:115)
> {code}
> This is because InterRecordReader became much stricter after PIG-3655. Before 
> it just simply skipped these bytes thinking that they are just garbage on the 
> split beginning. Now when we expect a [proper tuple with a tuple type 
> byte|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/impl/io/InterRecordReader.java#L153]
>  we see these nulls and throw an Exception.
> As I can see it this is happening because JoinGroupSparkConverter has to 
> return something even when it shouldn't.
> When the POPackage operator returns a 
> [POStatus.STATUS_NULL|https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/JoinGroupSparkConverter.java#L211],
>  the converter shouldn't return a thing, but it can't do better than 
> returning a null. This then gets written out by Spark..



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to