The Flink version is 1.2.0 On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com> wrote:
> @Till thanks for your reply. > > My code is similar to HashTableITCase.testInMemoryMutableHashTable() > . It just use the MutableHashTable class , there's no other Flink's > configuration. The main code body is: > > this.recordBuildSideAccessor = RecordSerializer.get(); >> this.recordProbeSideAccessor = RecordSerializer.get(); >> final int[] buildKeyPos = new int[]{buildSideJoinIndex}; >> final int[] probeKeyPos = new int[]{probeSideJoinIndex}; >> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new >> Class[]{BytesValue.class}; >> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType); >> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType); >> this.pactRecordComparator = new >> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); >> Sequence<Record> buildSideRecordsSeq = >> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); >> Sequence<Record> probeSideRecordsSeq = >> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); >> List<MemorySegment> memorySegments; >> int pageSize = hashTableMemoryManager.getTotalNumPages(); >> try { >> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, >> pageSize); >> } >> catch (MemoryAllocationException e) { >> LOGGER.error("could not allocate " + pageSize + " pages memory for >> HashJoin", e); >> Throwables.propagate(e); >> return; >> } >> try { >> Stopwatch stopwatch = Stopwatch.createStarted(); >> UniformRecordGenerator buildInput = new >> UniformRecordGenerator(buildSideRecordsSeq); >> UniformRecordGenerator probeInput = new >> UniformRecordGenerator(probeSideRecordsSeq); >> join = new MutableHashTable<Record, Record>( >> recordBuildSideAccessor, >> recordProbeSideAccessor, >> recordBuildSideComparator, >> recordProbeSideComparator, >> pactRecordComparator, >> memorySegments, >> ioManager >> ); >> join.open(buildInput,probeInput); >> >> LOGGER.info("construct hash table elapsed:" + >> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); >> >> > The BytesValue type is self defined one which holds byte[] , but just like > the original StringValue, also has the same serDe performance. > > > while (join.nextRecord()) { > Record currentProbeRecord = join.getCurrentProbeRecord(); > MutableObjectIterator<Record> buildSideIterator = > join.getBuildSideIterator(); > while (buildSideIterator.next(reusedBuildSideRow) != null) { > materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, > buildSideIndex2Vector, rowNum); > materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, > probeSideIndex2Vector, rowNum); > rowNum++; > }} > > > > > I have tried both the Record ,Row class as the type of records without any > better improved performance . I also tried batched the input records. That > means the buildInput or probeInput variables of the first code block > which iterate one Record a time from another batched Records . Batched > records's content stay in memory in Drill's ValueVector format. Once a > record is need to participate in the build or probe phase from a > iterate.next() call, > it will be fetched from the batched in memory ValueVector content. But no > performance gains. > > > The top hotspot profile from Jprofiler is below: > > > Hot spot,"Self time (microseconds)","Average Time","Invocations" > org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" > org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" > org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" > org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238," > n/a","n/a" > org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord, > 10955,"n/a","n/a" > org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a" > org.apache.flink.runtime.memory.MemoryManager. > allocatePages,104259,"n/a","n/a" > > > My log show that hashjoin.open() method costs too much time. > > > construct hash table elapsed:1885ms > > > > > On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> > wrote: > >> Hi Weijie, >> >> it might be the case that batching the processing of multiple rows can >> give you an improved performance compared to single row processing. >> >> Maybe you could share the exact benchmark base line results and the code >> you use to test Flink's MutableHashTable with us. Also the Flink >> configuration and how you run it would be of interest. That way we might be >> able to see if we can tune Flink a bit more. >> >> Cheers, >> Till >> >> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com> >> wrote: >> >>> I has a test case to use Flink's MutableHashTable class to do a hash >>> join on a local machine with 64g memory, 64cores. The test case is one >>> build table with 14w rows ,one probe table with 320w rows ,the matched >>> result rows is 12 w. >>> >>> It takes 2.2 seconds to complete the join.The performance seems bad. I >>> ensure there's no overflow, the smaller table is the build side. The >>> MutableObjectIterator is a sequence of Rows. The Row is composed of several >>> fields which are byte[]. Through my log,I find the open() method takes >>> 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's >>> profile shows the MutableObjectIterator's next() method call is the >>> hotspot. >>> >>> >>> I want to know how to tune this scenario. I find Drill's HashJoin is >>> batch model. Its build side's input is a RecordBatch which holds batch of >>> rows and memory size is approach to L2 cache. Through this strategy it will >>> gain less method calls (that means call to next() ) and much efficient to >>> cpu calculation. I also find SQL server's paper noticed the batch model's >>> performance gains (https://www.microsoft.com/en- >>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) >>> . I guess the performance's down is due to the single row iterate model. >>> >>> >>> Hope someone to correct my opinion. Also maybe I have a wrong use of >>> the MutableHashTable. wait for someone to give an advice. >>> >> >> >