@Till thanks for your reply.
My code is similar to HashTableITCase.testInMemoryMutableHashTable() .
It just use the MutableHashTable class , there's no other Flink's
configuration. The main code body is:
this.recordBuildSideAccessor = RecordSerializer.get();
> this.recordProbeSideAccessor = RecordSerializer.get();
> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new
> Class[]{BytesValue.class};
> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, keyType);
> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, keyType);
> this.pactRecordComparator = new
> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
> Sequence<Record> buildSideRecordsSeq =
> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
> Sequence<Record> probeSideRecordsSeq =
> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
> List<MemorySegment> memorySegments;
> int pageSize = hashTableMemoryManager.getTotalNumPages();
> try {
> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER,
> pageSize);
> }
> catch (MemoryAllocationException e) {
> LOGGER.error("could not allocate " + pageSize + " pages memory for
> HashJoin", e);
> Throwables.propagate(e);
> return;
> }
> try {
> Stopwatch stopwatch = Stopwatch.createStarted();
> UniformRecordGenerator buildInput = new
> UniformRecordGenerator(buildSideRecordsSeq);
> UniformRecordGenerator probeInput = new
> UniformRecordGenerator(probeSideRecordsSeq);
> join = new MutableHashTable<Record, Record>(
> recordBuildSideAccessor,
> recordProbeSideAccessor,
> recordBuildSideComparator,
> recordProbeSideComparator,
> pactRecordComparator,
> memorySegments,
> ioManager
> );
> join.open(buildInput,probeInput);
>
> LOGGER.info("construct hash table elapsed:" +
> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>
>
The BytesValue type is self defined one which holds byte[] , but just like
the original StringValue, also has the same serDe performance.
while (join.nextRecord()) {
Record currentProbeRecord = join.getCurrentProbeRecord();
MutableObjectIterator<Record> buildSideIterator = join.getBuildSideIterator();
while (buildSideIterator.next(reusedBuildSideRow) != null) {
materializeRecord2OutVector(reusedBuildSideRow,
buildSideIndex2Value, buildSideIndex2Vector, rowNum);
materializeRecord2OutVector(currentProbeRecord,
probeSideIndex2Value, probeSideIndex2Vector, rowNum);
rowNum++;
}}
I have tried both the Record ,Row class as the type of records without any
better improved performance . I also tried batched the input records. That
means the buildInput or probeInput variables of the first code block which
iterate one Record a time from another batched Records . Batched records's
content stay in memory in Drill's ValueVector format. Once a record is need
to participate in the build or probe phase from a iterate.next() call,
it will be fetched from the batched in memory ValueVector content. But no
performance gains.
The top hotspot profile from Jprofiler is below:
>
Hot spot,"Self time (microseconds)","Average Time","Invocations"
org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.open,55238,"n/a","n/a"
org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord,10955,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.release,33484,"n/a","n/a"
org.apache.flink.runtime.memory.MemoryManager.allocatePages,104259,"n/a","n/a"
My log show that hashjoin.open() method costs too much time.
>
construct hash table elapsed:1885ms
On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <[email protected]> wrote:
> Hi Weijie,
>
> it might be the case that batching the processing of multiple rows can
> give you an improved performance compared to single row processing.
>
> Maybe you could share the exact benchmark base line results and the code
> you use to test Flink's MutableHashTable with us. Also the Flink
> configuration and how you run it would be of interest. That way we might be
> able to see if we can tune Flink a bit more.
>
> Cheers,
> Till
>
> On Sun, May 14, 2017 at 5:23 AM, weijie tong <[email protected]>
> wrote:
>
>> I has a test case to use Flink's MutableHashTable class to do a hash join
>> on a local machine with 64g memory, 64cores. The test case is one build
>> table with 14w rows ,one probe table with 320w rows ,the matched result
>> rows is 12 w.
>>
>> It takes 2.2 seconds to complete the join.The performance seems bad. I
>> ensure there's no overflow, the smaller table is the build side. The
>> MutableObjectIterator is a sequence of Rows. The Row is composed of several
>> fields which are byte[]. Through my log,I find the open() method takes
>> 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's
>> profile shows the MutableObjectIterator's next() method call is the
>> hotspot.
>>
>>
>> I want to know how to tune this scenario. I find Drill's HashJoin is
>> batch model. Its build side's input is a RecordBatch which holds batch of
>> rows and memory size is approach to L2 cache. Through this strategy it will
>> gain less method calls (that means call to next() ) and much efficient to
>> cpu calculation. I also find SQL server's paper noticed the batch model's
>> performance gains (https://www.microsoft.com/en-
>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) .
>> I guess the performance's down is due to the single row iterate model.
>>
>>
>> Hope someone to correct my opinion. Also maybe I have a wrong use of the
>> MutableHashTable. wait for someone to give an advice.
>>
>
>