thanks for tip @Stephan.

To [1] , there's a description about  "I’ve got sooo much data to join, do
I really need to ship it?" . How to configure Flink to touch that target?
Is there a performance report ?

[1] :
https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html

On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote:

> Hi!
>
> Be aware that the "Row" and "Record" types are not very high performance
> data types. You might be measuring the data type overhead, rather than the
> hash table performance. Also, the build measurements include the data
> generation, which influences the results.
>
> If you want to purely benchmark the HashTable performance, try using
> something like "Tuple2<Long, Long>" or so (or write your own custom
> TypeSerializer / TypeComparator).
>
> Stephan
>
>
> On Tue, May 16, 2017 at 11:23 AM, weijie tong <tongweijie...@gmail.com>
> wrote:
>
>> Thanks for all your enthusiastic response. Yes, My target was to try to
>> find the best performance in memory. I got that.
>>
>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Flink's HashJoin implementation was designed to gracefully handle inputs
>>> that exceed the main memory.
>>> It is not explicitly optimized for in-memory processing and does not
>>> play fancy tricks like optimizing cache accesses or batching.
>>> I assume your benchmark is about in-memory joins only. This was not the
>>> main design goal when the join was implemented but robustness.
>>> Since most of the development of Flink focuses on streaming applications
>>> at the moment, the join implementation has barely been touched in recent
>>> years (except for minor extensions and bugfixes).
>>>
>>> Regarding your tests, Tuple should give better performance than Row
>>> because Row is null-sensitive and serialized a null-mask.
>>> There is also a blog post about Flink's join performance [1] which is
>>> already a bit dusty but as I said, the algorithm hasn't change much since
>>> then.
>>>
>>> Best, Fabian
>>>
>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache
>>> -Flinks-Engine-Room.html
>>>
>>>
>>> 2017-05-15 16:26 GMT+02:00 weijie tong <tongweijie...@gmail.com>:
>>>
>>>> The Flink version is 1.2.0
>>>>
>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com>
>>>> wrote:
>>>>
>>>>> @Till thanks for your reply.
>>>>>
>>>>> My code is similar to   HashTableITCase.testInMemory
>>>>> MutableHashTable()   . It just use the MutableHashTable class ,
>>>>> there's  no other Flink's configuration.  The main code body is:
>>>>>
>>>>> this.recordBuildSideAccessor = RecordSerializer.get();
>>>>>> this.recordProbeSideAccessor = RecordSerializer.get();
>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex};
>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex};
>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new 
>>>>>> Class[]{BytesValue.class};
>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, 
>>>>>> keyType);
>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, 
>>>>>> keyType);
>>>>>> this.pactRecordComparator = new 
>>>>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex);
>>>>>> Sequence<Record> buildSideRecordsSeq = 
>>>>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery);
>>>>>> Sequence<Record> probeSideRecordsSeq = 
>>>>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery);
>>>>>> List<MemorySegment> memorySegments;
>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages();
>>>>>> try {
>>>>>>   memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, 
>>>>>> pageSize);
>>>>>> }
>>>>>> catch (MemoryAllocationException e) {
>>>>>>   LOGGER.error("could not allocate " + pageSize + " pages memory for 
>>>>>> HashJoin", e);
>>>>>>   Throwables.propagate(e);
>>>>>>   return;
>>>>>> }
>>>>>> try {
>>>>>>   Stopwatch stopwatch = Stopwatch.createStarted();
>>>>>>   UniformRecordGenerator buildInput = new 
>>>>>> UniformRecordGenerator(buildSideRecordsSeq);
>>>>>>   UniformRecordGenerator probeInput = new 
>>>>>> UniformRecordGenerator(probeSideRecordsSeq);
>>>>>>   join = new MutableHashTable<Record, Record>(
>>>>>>       recordBuildSideAccessor,
>>>>>>       recordProbeSideAccessor,
>>>>>>       recordBuildSideComparator,
>>>>>>       recordProbeSideComparator,
>>>>>>       pactRecordComparator,
>>>>>>       memorySegments,
>>>>>>       ioManager
>>>>>>   );
>>>>>>   join.open(buildInput,probeInput);
>>>>>>
>>>>>>   LOGGER.info("construct hash table elapsed:" + 
>>>>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
>>>>>>
>>>>>>
>>>>> The BytesValue type is self defined one which holds byte[] , but just
>>>>> like the original StringValue, also has the same serDe performance.
>>>>>
>>>>>
>>>>> while (join.nextRecord()) {
>>>>>   Record currentProbeRecord = join.getCurrentProbeRecord();
>>>>>   MutableObjectIterator<Record> buildSideIterator = 
>>>>> join.getBuildSideIterator();
>>>>>   while (buildSideIterator.next(reusedBuildSideRow) != null) {
>>>>>     materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, 
>>>>> buildSideIndex2Vector, rowNum);
>>>>>     materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, 
>>>>> probeSideIndex2Vector, rowNum);
>>>>>     rowNum++;
>>>>>   }}
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> I have tried both the Record ,Row class as the type of records without
>>>>> any better improved performance . I also tried batched the input records.
>>>>> That means the  buildInput or probeInput variables of the first code
>>>>> block which iterate one Record a time from another batched Records .
>>>>> Batched records's content stay in memory in Drill's ValueVector format.
>>>>> Once a record is need to participate in the build or probe phase from a
>>>>> iterate.next() call,
>>>>> it will be fetched from the batched in memory ValueVector content. But
>>>>> no performance gains.
>>>>>
>>>>>
>>>>> The top hotspot profile from Jprofiler is below:
>>>>> >
>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations"
>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a"
>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a"
>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> open,55238,"n/a","n/a"
>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.
>>>>> nextRecord,10955,"n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484,
>>>>> "n/a","n/a"
>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages,
>>>>> 104259,"n/a","n/a"
>>>>>
>>>>>
>>>>> My log show that hashjoin.open()  method costs too much time.
>>>>> >
>>>>> construct hash table elapsed:1885ms
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Weijie,
>>>>>>
>>>>>> it might be the case that batching the processing of multiple rows
>>>>>> can give you an improved performance compared to single row processing.
>>>>>>
>>>>>> Maybe you could share the exact benchmark base line results and the
>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink
>>>>>> configuration and how you run it would be of interest. That way we might 
>>>>>> be
>>>>>> able to see if we can tune Flink a bit more.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com
>>>>>> > wrote:
>>>>>>
>>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash
>>>>>>> join on a local machine with 64g memory, 64cores. The test case is one
>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched
>>>>>>> result rows is 12 w.
>>>>>>>
>>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad.
>>>>>>> I ensure there's no overflow, the smaller table is the build side. The
>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of 
>>>>>>> several
>>>>>>> fields which are byte[]. Through my log,I find the open() method takes
>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms.  And my Jprofiler's
>>>>>>> profile shows the MutableObjectIterator's next() method call is the
>>>>>>> hotspot.
>>>>>>>
>>>>>>>
>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is
>>>>>>> batch model. Its build side's input is a RecordBatch which holds batch 
>>>>>>> of
>>>>>>> rows and memory size is approach to L2 cache. Through this strategy it 
>>>>>>> will
>>>>>>> gain less method calls (that means call to next() ) and much efficient 
>>>>>>> to
>>>>>>> cpu calculation.  I also find SQL server's paper noticed the batch 
>>>>>>> model's
>>>>>>> performance gains (https://www.microsoft.com/en-
>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf)
>>>>>>>  .   I guess the performance's down is due to the single row iterate 
>>>>>>> model.
>>>>>>>
>>>>>>>
>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use
>>>>>>>  of the MutableHashTable. wait for someone to give an advice.
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Reply via email to