thanks for tip @Stephan. To [1] , there's a description about "I’ve got sooo much data to join, do I really need to ship it?" . How to configure Flink to touch that target? Is there a performance report ?
[1] : https://flink.apache.org/news/2015/03/13/peeking-into-Apache-Flinks-Engine-Room.html On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Be aware that the "Row" and "Record" types are not very high performance > data types. You might be measuring the data type overhead, rather than the > hash table performance. Also, the build measurements include the data > generation, which influences the results. > > If you want to purely benchmark the HashTable performance, try using > something like "Tuple2<Long, Long>" or so (or write your own custom > TypeSerializer / TypeComparator). > > Stephan > > > On Tue, May 16, 2017 at 11:23 AM, weijie tong <tongweijie...@gmail.com> > wrote: > >> Thanks for all your enthusiastic response. Yes, My target was to try to >> find the best performance in memory. I got that. >> >> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi, >>> >>> Flink's HashJoin implementation was designed to gracefully handle inputs >>> that exceed the main memory. >>> It is not explicitly optimized for in-memory processing and does not >>> play fancy tricks like optimizing cache accesses or batching. >>> I assume your benchmark is about in-memory joins only. This was not the >>> main design goal when the join was implemented but robustness. >>> Since most of the development of Flink focuses on streaming applications >>> at the moment, the join implementation has barely been touched in recent >>> years (except for minor extensions and bugfixes). >>> >>> Regarding your tests, Tuple should give better performance than Row >>> because Row is null-sensitive and serialized a null-mask. >>> There is also a blog post about Flink's join performance [1] which is >>> already a bit dusty but as I said, the algorithm hasn't change much since >>> then. >>> >>> Best, Fabian >>> >>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache >>> -Flinks-Engine-Room.html >>> >>> >>> 2017-05-15 16:26 GMT+02:00 weijie tong <tongweijie...@gmail.com>: >>> >>>> The Flink version is 1.2.0 >>>> >>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com> >>>> wrote: >>>> >>>>> @Till thanks for your reply. >>>>> >>>>> My code is similar to HashTableITCase.testInMemory >>>>> MutableHashTable() . It just use the MutableHashTable class , >>>>> there's no other Flink's configuration. The main code body is: >>>>> >>>>> this.recordBuildSideAccessor = RecordSerializer.get(); >>>>>> this.recordProbeSideAccessor = RecordSerializer.get(); >>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex}; >>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex}; >>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new >>>>>> Class[]{BytesValue.class}; >>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, >>>>>> keyType); >>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, >>>>>> keyType); >>>>>> this.pactRecordComparator = new >>>>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); >>>>>> Sequence<Record> buildSideRecordsSeq = >>>>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); >>>>>> Sequence<Record> probeSideRecordsSeq = >>>>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); >>>>>> List<MemorySegment> memorySegments; >>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages(); >>>>>> try { >>>>>> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, >>>>>> pageSize); >>>>>> } >>>>>> catch (MemoryAllocationException e) { >>>>>> LOGGER.error("could not allocate " + pageSize + " pages memory for >>>>>> HashJoin", e); >>>>>> Throwables.propagate(e); >>>>>> return; >>>>>> } >>>>>> try { >>>>>> Stopwatch stopwatch = Stopwatch.createStarted(); >>>>>> UniformRecordGenerator buildInput = new >>>>>> UniformRecordGenerator(buildSideRecordsSeq); >>>>>> UniformRecordGenerator probeInput = new >>>>>> UniformRecordGenerator(probeSideRecordsSeq); >>>>>> join = new MutableHashTable<Record, Record>( >>>>>> recordBuildSideAccessor, >>>>>> recordProbeSideAccessor, >>>>>> recordBuildSideComparator, >>>>>> recordProbeSideComparator, >>>>>> pactRecordComparator, >>>>>> memorySegments, >>>>>> ioManager >>>>>> ); >>>>>> join.open(buildInput,probeInput); >>>>>> >>>>>> LOGGER.info("construct hash table elapsed:" + >>>>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); >>>>>> >>>>>> >>>>> The BytesValue type is self defined one which holds byte[] , but just >>>>> like the original StringValue, also has the same serDe performance. >>>>> >>>>> >>>>> while (join.nextRecord()) { >>>>> Record currentProbeRecord = join.getCurrentProbeRecord(); >>>>> MutableObjectIterator<Record> buildSideIterator = >>>>> join.getBuildSideIterator(); >>>>> while (buildSideIterator.next(reusedBuildSideRow) != null) { >>>>> materializeRecord2OutVector(reusedBuildSideRow, buildSideIndex2Value, >>>>> buildSideIndex2Vector, rowNum); >>>>> materializeRecord2OutVector(currentProbeRecord, probeSideIndex2Value, >>>>> probeSideIndex2Vector, rowNum); >>>>> rowNum++; >>>>> }} >>>>> >>>>> >>>>> >>>>> >>>>> I have tried both the Record ,Row class as the type of records without >>>>> any better improved performance . I also tried batched the input records. >>>>> That means the buildInput or probeInput variables of the first code >>>>> block which iterate one Record a time from another batched Records . >>>>> Batched records's content stay in memory in Drill's ValueVector format. >>>>> Once a record is need to participate in the build or probe phase from a >>>>> iterate.next() call, >>>>> it will be fetched from the batched in memory ValueVector content. But >>>>> no performance gains. >>>>> >>>>> >>>>> The top hotspot profile from Jprofiler is below: >>>>> > >>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations" >>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" >>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" >>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" >>>>> org.apache.flink.runtime.operators.hash.MutableHashTable. >>>>> open,55238,"n/a","n/a" >>>>> org.apache.flink.runtime.operators.hash.MutableHashTable. >>>>> nextRecord,10955,"n/a","n/a" >>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484, >>>>> "n/a","n/a" >>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages, >>>>> 104259,"n/a","n/a" >>>>> >>>>> >>>>> My log show that hashjoin.open() method costs too much time. >>>>> > >>>>> construct hash table elapsed:1885ms >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Hi Weijie, >>>>>> >>>>>> it might be the case that batching the processing of multiple rows >>>>>> can give you an improved performance compared to single row processing. >>>>>> >>>>>> Maybe you could share the exact benchmark base line results and the >>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink >>>>>> configuration and how you run it would be of interest. That way we might >>>>>> be >>>>>> able to see if we can tune Flink a bit more. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong <tongweijie...@gmail.com >>>>>> > wrote: >>>>>> >>>>>>> I has a test case to use Flink's MutableHashTable class to do a hash >>>>>>> join on a local machine with 64g memory, 64cores. The test case is one >>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched >>>>>>> result rows is 12 w. >>>>>>> >>>>>>> It takes 2.2 seconds to complete the join.The performance seems bad. >>>>>>> I ensure there's no overflow, the smaller table is the build side. The >>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of >>>>>>> several >>>>>>> fields which are byte[]. Through my log,I find the open() method takes >>>>>>> 1.560 seconds. The probe iterates phase takes 680ms. And my Jprofiler's >>>>>>> profile shows the MutableObjectIterator's next() method call is the >>>>>>> hotspot. >>>>>>> >>>>>>> >>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin is >>>>>>> batch model. Its build side's input is a RecordBatch which holds batch >>>>>>> of >>>>>>> rows and memory size is approach to L2 cache. Through this strategy it >>>>>>> will >>>>>>> gain less method calls (that means call to next() ) and much efficient >>>>>>> to >>>>>>> cpu calculation. I also find SQL server's paper noticed the batch >>>>>>> model's >>>>>>> performance gains (https://www.microsoft.com/en- >>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-final.pdf) >>>>>>> . I guess the performance's down is due to the single row iterate >>>>>>> model. >>>>>>> >>>>>>> >>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use >>>>>>> of the MutableHashTable. wait for someone to give an advice. >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >