[ https://issues.apache.org/jira/browse/PIG-5359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rohini Palaniswamy updated PIG-5359: ------------------------------------ Resolution: Fixed Hadoop Flags: Reviewed Fix Version/s: 0.18.0 Status: Resolved (was: Patch Available) +1 to PIG-5359-3.patch from review board. Uploaded it here and committed to trunk. > Reduce time spent in split serialization > ---------------------------------------- > > Key: PIG-5359 > URL: https://issues.apache.org/jira/browse/PIG-5359 > Project: Pig > Issue Type: Improvement > Reporter: Satish Subhashrao Saley > Assignee: Satish Subhashrao Saley > Priority: Major > Fix For: 0.18.0 > > Attachments: PIG-5359-3.patch > > > 1. Unnecessary serialization of splits in Tez. > In LoaderProcessor, pig calls > > [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java#L172] > {code:java} > tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, > false, 0)); > {code} > It ends up serializing the splits, just to print log. > [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L317] > {code:java} > public static InputSplitInfoMem generateInputSplitsToMem(Configuration conf, > boolean groupSplits, boolean sortSplits, int targetTasks) > throws IOException, ClassNotFoundException, InterruptedException { > .... > .... > LOG.info("NumSplits: " + splitInfoMem.getNumTasks() + ", > SerializedSize: " > + splitInfoMem.getSplitsProto().getSerializedSize()); > return splitInfoMem; > {code} > [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L106] > {code:java} > public MRSplitsProto getSplitsProto() { > if (isNewSplit) { > try { > return createSplitsProto(newFormatSplits, new > SerializationFactory(conf)); > {code} > [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/InputSplitInfoMem.java#L152-L170] > {code:java} > private static MRSplitsProto createSplitsProto( > org.apache.hadoop.mapreduce.InputSplit[] newSplits, > SerializationFactory serializationFactory) throws IOException, > InterruptedException { > MRSplitsProto.Builder splitsBuilder = MRSplitsProto.newBuilder(); > for (org.apache.hadoop.mapreduce.InputSplit newSplit : newSplits) { > splitsBuilder.addSplits(MRInputHelpers.createSplitProto(newSplit, > serializationFactory)); > } > return splitsBuilder.build(); > } > {code} > [https://github.com/apache/tez/blob/master/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRInputHelpers.java#L221-L259] > 2. In TezDagBuilder, if splitsSerializedSize > spillThreshold, then the > InputSplits serialized in MRSplitsProto are not used by Pig and it serializes > again directly to disk via JobSplitWriter.createSplitFiles. So the InputSplit > serialization logic is called again which is wasteful and expensive in cases > like HCat. > [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L946-L947] > {code:java} > MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto(); > int splitsSerializedSize = splitsProto.getSerializedSize(); > {code} > The getSplitsProto, creates MRSplitsProto which consists of list of > MRSplitProto. MRSplitProto has serialized bytes of each InputFormat. If > splitsSerializedSize > spillThreshold, pig writes the splits to disk via > {code:java} > if(splitsSerializedSize > spillThreshold) { > inputPayLoad.setBoolean( > > org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS, > false); > // Write splits to disk > Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc); > log.info("Writing input splits to " + inputSplitsDir > + " for vertex " + vertex.getName() > + " as the serialized size in memory is " > + splitsSerializedSize + ". Configured " > + PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD > + " is " + spillThreshold); > inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk( > (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, payloadConf, > fs); > {code} > [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java#L960] > > [https://github.com/apache/pig/blob/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java#L302-L314] > Solution: > 1. Do not serialize the split in LoaderProcessor.java > 2. In TezDagBuilder.java, serialize each input split and keep adding its > size and if it exceeds spillThreshold, then write the splits to disk reusing > the serialized buffers for each split. > > Thank you [~rohini] for identifying the issue. -- This message was sent by Atlassian JIRA (v7.6.3#76005)