Hi, I'm not aware of a performance report for this feature. I don't think it is well known or used a lot. The classes to check out for prepartitioned / presorted data are SplitDataProperties [1], DataSource [2], and as an example PropertyDataSourceTest [3].
[1] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/io/SplitDataProperties.java [2] https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSource.java [3] https://github.com/apache/flink/blob/master/flink-optimizer/src/test/java/org/apache/flink/optimizer/PropertyDataSourceTest.java Best, Fabian <https://github.com/apache/flink/commit/f0a28bf5345084a0a43df16021e60078e322e087#diff-c7c697fdb164023d6a737f3dcd23f2c0> 2017-05-18 13:54 GMT+02:00 weijie tong <tongweijie...@gmail.com>: > thanks for tip @Stephan. > > To [1] , there's a description about "I’ve got sooo much data to join, > do I really need to ship it?" . How to configure Flink to touch that > target? Is there a performance report ? > > [1] : https://flink.apache.org/news/2015/03/13/peeking-into- > Apache-Flinks-Engine-Room.html > > On Wed, May 17, 2017 at 1:32 AM, Stephan Ewen <se...@apache.org> wrote: > >> Hi! >> >> Be aware that the "Row" and "Record" types are not very high performance >> data types. You might be measuring the data type overhead, rather than the >> hash table performance. Also, the build measurements include the data >> generation, which influences the results. >> >> If you want to purely benchmark the HashTable performance, try using >> something like "Tuple2<Long, Long>" or so (or write your own custom >> TypeSerializer / TypeComparator). >> >> Stephan >> >> >> On Tue, May 16, 2017 at 11:23 AM, weijie tong <tongweijie...@gmail.com> >> wrote: >> >>> Thanks for all your enthusiastic response. Yes, My target was to try to >>> find the best performance in memory. I got that. >>> >>> On Tue, 16 May 2017 at 4:10 PM Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Flink's HashJoin implementation was designed to gracefully handle >>>> inputs that exceed the main memory. >>>> It is not explicitly optimized for in-memory processing and does not >>>> play fancy tricks like optimizing cache accesses or batching. >>>> I assume your benchmark is about in-memory joins only. This was not the >>>> main design goal when the join was implemented but robustness. >>>> Since most of the development of Flink focuses on streaming >>>> applications at the moment, the join implementation has barely been touched >>>> in recent years (except for minor extensions and bugfixes). >>>> >>>> Regarding your tests, Tuple should give better performance than Row >>>> because Row is null-sensitive and serialized a null-mask. >>>> There is also a blog post about Flink's join performance [1] which is >>>> already a bit dusty but as I said, the algorithm hasn't change much since >>>> then. >>>> >>>> Best, Fabian >>>> >>>> [1] https://flink.apache.org/news/2015/03/13/peeking-into-Apache >>>> -Flinks-Engine-Room.html >>>> >>>> >>>> 2017-05-15 16:26 GMT+02:00 weijie tong <tongweijie...@gmail.com>: >>>> >>>>> The Flink version is 1.2.0 >>>>> >>>>> On Mon, May 15, 2017 at 10:24 PM, weijie tong <tongweijie...@gmail.com >>>>> > wrote: >>>>> >>>>>> @Till thanks for your reply. >>>>>> >>>>>> My code is similar to HashTableITCase.testInMemory >>>>>> MutableHashTable() . It just use the MutableHashTable class , >>>>>> there's no other Flink's configuration. The main code body is: >>>>>> >>>>>> this.recordBuildSideAccessor = RecordSerializer.get(); >>>>>>> this.recordProbeSideAccessor = RecordSerializer.get(); >>>>>>> final int[] buildKeyPos = new int[]{buildSideJoinIndex}; >>>>>>> final int[] probeKeyPos = new int[]{probeSideJoinIndex}; >>>>>>> final Class<? extends Value>[] keyType = (Class<? extends Value>[]) new >>>>>>> Class[]{BytesValue.class}; >>>>>>> this.recordBuildSideComparator = new RecordComparator(buildKeyPos, >>>>>>> keyType); >>>>>>> this.recordProbeSideComparator = new RecordComparator(probeKeyPos, >>>>>>> keyType); >>>>>>> this.pactRecordComparator = new >>>>>>> HashJoinVectorJointGroupIterator.RecordPairComparator(buildSideJoinIndex); >>>>>>> Sequence<Record> buildSideRecordsSeq = >>>>>>> makeSequenceRecordOfSameSideSegments(buildSideSegs, localJoinQuery); >>>>>>> Sequence<Record> probeSideRecordsSeq = >>>>>>> makeSequenceRecordOfSameSideSegments(probeSideSegs, localJoinQuery); >>>>>>> List<MemorySegment> memorySegments; >>>>>>> int pageSize = hashTableMemoryManager.getTotalNumPages(); >>>>>>> try { >>>>>>> memorySegments = this.hashTableMemoryManager.allocatePages(MEM_OWNER, >>>>>>> pageSize); >>>>>>> } >>>>>>> catch (MemoryAllocationException e) { >>>>>>> LOGGER.error("could not allocate " + pageSize + " pages memory for >>>>>>> HashJoin", e); >>>>>>> Throwables.propagate(e); >>>>>>> return; >>>>>>> } >>>>>>> try { >>>>>>> Stopwatch stopwatch = Stopwatch.createStarted(); >>>>>>> UniformRecordGenerator buildInput = new >>>>>>> UniformRecordGenerator(buildSideRecordsSeq); >>>>>>> UniformRecordGenerator probeInput = new >>>>>>> UniformRecordGenerator(probeSideRecordsSeq); >>>>>>> join = new MutableHashTable<Record, Record>( >>>>>>> recordBuildSideAccessor, >>>>>>> recordProbeSideAccessor, >>>>>>> recordBuildSideComparator, >>>>>>> recordProbeSideComparator, >>>>>>> pactRecordComparator, >>>>>>> memorySegments, >>>>>>> ioManager >>>>>>> ); >>>>>>> join.open(buildInput,probeInput); >>>>>>> >>>>>>> LOGGER.info("construct hash table elapsed:" + >>>>>>> stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms"); >>>>>>> >>>>>>> >>>>>> The BytesValue type is self defined one which holds byte[] , but just >>>>>> like the original StringValue, also has the same serDe performance. >>>>>> >>>>>> >>>>>> while (join.nextRecord()) { >>>>>> Record currentProbeRecord = join.getCurrentProbeRecord(); >>>>>> MutableObjectIterator<Record> buildSideIterator = >>>>>> join.getBuildSideIterator(); >>>>>> while (buildSideIterator.next(reusedBuildSideRow) != null) { >>>>>> materializeRecord2OutVector(reusedBuildSideRow, >>>>>> buildSideIndex2Value, buildSideIndex2Vector, rowNum); >>>>>> materializeRecord2OutVector(currentProbeRecord, >>>>>> probeSideIndex2Value, probeSideIndex2Vector, rowNum); >>>>>> rowNum++; >>>>>> }} >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> I have tried both the Record ,Row class as the type of records >>>>>> without any better improved performance . I also tried batched the input >>>>>> records. That means the buildInput or probeInput variables of the >>>>>> first code block which iterate one Record a time from another batched >>>>>> Records . Batched records's content stay in memory in Drill's ValueVector >>>>>> format. Once a record is need to participate in the build or probe phase >>>>>> from a iterate.next() call, >>>>>> it will be fetched from the batched in memory ValueVector content. >>>>>> But no performance gains. >>>>>> >>>>>> >>>>>> The top hotspot profile from Jprofiler is below: >>>>>> > >>>>>> Hot spot,"Self time (microseconds)","Average Time","Invocations" >>>>>> org.apache.flink.types.Record.serialize,1014127,"n/a","n/a" >>>>>> org.apache.flink.types.Record.deserialize,60684,"n/a","n/a" >>>>>> org.apache.flink.types.Record.copyTo,83007,"n/a","n/a" >>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.ope >>>>>> n,55238,"n/a","n/a" >>>>>> org.apache.flink.runtime.operators.hash.MutableHashTable.nex >>>>>> tRecord,10955,"n/a","n/a" >>>>>> org.apache.flink.runtime.memory.MemoryManager.release,33484, >>>>>> "n/a","n/a" >>>>>> org.apache.flink.runtime.memory.MemoryManager.allocatePages, >>>>>> 104259,"n/a","n/a" >>>>>> >>>>>> >>>>>> My log show that hashjoin.open() method costs too much time. >>>>>> > >>>>>> construct hash table elapsed:1885ms >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Mon, May 15, 2017 at 6:20 PM, Till Rohrmann <trohrm...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Hi Weijie, >>>>>>> >>>>>>> it might be the case that batching the processing of multiple rows >>>>>>> can give you an improved performance compared to single row processing. >>>>>>> >>>>>>> Maybe you could share the exact benchmark base line results and the >>>>>>> code you use to test Flink's MutableHashTable with us. Also the Flink >>>>>>> configuration and how you run it would be of interest. That way we >>>>>>> might be >>>>>>> able to see if we can tune Flink a bit more. >>>>>>> >>>>>>> Cheers, >>>>>>> Till >>>>>>> >>>>>>> On Sun, May 14, 2017 at 5:23 AM, weijie tong < >>>>>>> tongweijie...@gmail.com> wrote: >>>>>>> >>>>>>>> I has a test case to use Flink's MutableHashTable class to do a >>>>>>>> hash join on a local machine with 64g memory, 64cores. The test case >>>>>>>> is one >>>>>>>> build table with 14w rows ,one probe table with 320w rows ,the matched >>>>>>>> result rows is 12 w. >>>>>>>> >>>>>>>> It takes 2.2 seconds to complete the join.The performance seems >>>>>>>> bad. I ensure there's no overflow, the smaller table is the build >>>>>>>> side. The >>>>>>>> MutableObjectIterator is a sequence of Rows. The Row is composed of >>>>>>>> several >>>>>>>> fields which are byte[]. Through my log,I find the open() method takes >>>>>>>> 1.560 seconds. The probe iterates phase takes 680ms. And my >>>>>>>> Jprofiler's >>>>>>>> profile shows the MutableObjectIterator's next() method call is the >>>>>>>> hotspot. >>>>>>>> >>>>>>>> >>>>>>>> I want to know how to tune this scenario. I find Drill's HashJoin >>>>>>>> is batch model. Its build side's input is a RecordBatch which holds >>>>>>>> batch >>>>>>>> of rows and memory size is approach to L2 cache. Through this strategy >>>>>>>> it >>>>>>>> will gain less method calls (that means call to next() ) and much >>>>>>>> efficient >>>>>>>> to cpu calculation. I also find SQL server's paper noticed the batch >>>>>>>> model's performance gains (https://www.microsoft.com/en- >>>>>>>> us/research/wp-content/uploads/2013/06/Apollo3-Sigmod-2013-f >>>>>>>> inal.pdf) . I guess the performance's down is due to the single >>>>>>>> row iterate model. >>>>>>>> >>>>>>>> >>>>>>>> Hope someone to correct my opinion. Also maybe I have a wrong use >>>>>>>> of the MutableHashTable. wait for someone to give an advice. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> >