Cool! Always happy to help :-)
2015-09-16 14:41 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky > programming error though I already spent quite some time wondering about > this one. > > Have a nice day! > > - Pieter > > > > 2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > >> Sorry, I was thinking too complicated. Forget about the methods I >> mentioned. >> >> If you are implementing WritableComparable types, you need to override >> the hashcode() method. >> Flink treats WritableComparable types just like Hadoop [1]. >> DawnData does not implement hashcode() which causes inconsistent hash >> partitioning. >> >> Please let me know, if that solved your problem. >> >> Cheers, Fabian >> >> [1] >> https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/ >> >> 2015-09-16 14:12 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >> >>> Hi, >>> >>> I havent been able to find the problem yet, and I dont know exactly how >>> to check the methods you suggested to check earlier (extractKeys, >>> getFlatComparators, duplicate) for the Scala API. Do you have some pointers >>> for me on how I can check these myself? >>> >>> In my earlier mail I stated that maps, filters and reduces work fine. I >>> found that this was not correct: for my previous queries I have only used >>> maps and filters. I made an extra test and found that indeed the following >>> code using a reduce also generates faulty results when increasing >>> paralellism past 1: >>> >>> def auctions : DataSet[DawnData] = env.readFile(new >>> XML2DawnInputFormat(auctionInput), path) >>> def test = >>> auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( >>> (groupedauctions, out : Collector[DawnData]) => { >>> out.collect(new DawnData(groupedauctions.size)) >>> }).setParallelism(1) >>> test.print >>> >>> Does this indicate that something else could be wrong with the custom >>> datatype? >>> >>> You can find the corresponding code and a small dataset at >>> https://github.com/PHameete/dawn-flink in the *development* branch. It >>> is a Scala Maven project so you should be able to run the >>> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of >>> the box to run the query from my first email. In this class you can also >>> change the query thats being run or run multiple queries. If this does not >>> work please let me know! >>> >>> Kind regards and cheers again! >>> >>> - Pieter >>> >>> >>> >>> >>> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phame...@gmail.com>: >>> >>>> Cheers Till and Fabian for your fast replies, it's much appreciated! >>>> >>>> I figured something should be wrong with my data type. I have no doubt >>>> the CoGroup works just fine :-) Its pointers what to investigate about my >>>> datatype what I am looking for. Initially I had problems with serialization >>>> causing strange issues as well, these were resolved after I had rewritten >>>> my serialization so I believe that is working OK. >>>> >>>> I'll try looking into the data type some more with your tips. If I cant >>>> figure it out i'll share the repository with you later today or tomorrow. >>>> >>>> Kind regards, >>>> >>>> Pieter >>>> >>>> >>>> >>>> >>>> >>>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >>>> >>>>> This sound like a problem with your custom type and its (presumably) >>>>> custom serializers and comparators. >>>>> >>>>> I assume it is not an issue of partitioning or sorting because Reduce >>>>> is working fine, as you reported. >>>>> CoGroup does also partition and sort data, but compares the elements >>>>> of two sorted streams. >>>>> >>>>> I would check the following methods: >>>>> - extractKeys >>>>> - getFlatComparators >>>>> - duplicate (duplicate must return a deep copy, esp. of all nested >>>>> comparators) >>>>> >>>>> Feel free to share your custom TypeInfo, Comparator, and Serializers. >>>>> >>>>> Cheers, Fabian >>>>> >>>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: >>>>> >>>>>> Hi Pieter, >>>>>> >>>>>> your code doesn't look suspicious at the first glance. Would it be >>>>>> possible for you to post a complete example with data (also possible to >>>>>> include it in the code) to reproduce your problem? >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Dear fellow Flinkers, >>>>>>> >>>>>>> I am implementing queries from the XMark ( >>>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a >>>>>>> custom nested data type. Reading the XML data generated by the XMark >>>>>>> generator into my custom nested datatype works perfectly, and the >>>>>>> queries >>>>>>> that I have implemented so far using mostly map, reduce and filter >>>>>>> produce >>>>>>> correct results. >>>>>>> >>>>>>> For the next query I wish to cogroup a dataset containing person >>>>>>> data with a dataset containing auction data, joined by the *personid >>>>>>> *of the person and the *personid *of the buyer of an auction, so >>>>>>> that I can count the number of purchases of a person. I select this >>>>>>> *personid >>>>>>> *as key from the custom nested data type in the *where* and *equalTo >>>>>>> *functions of the *coGroup*. The XML2DawnInputFormat is my custom >>>>>>> input format that reads XML into my custom nested datatype >>>>>>> *DawnData*. The 'inputGraph' and 'auctionInput' are a projection on >>>>>>> the XML input to prevent reading unnecessary data. >>>>>>> >>>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment >>>>>>> *def persons : DataSet[DawnData] = env.readFile(new >>>>>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] >>>>>>> = env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = >>>>>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) >>>>>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( >>>>>>> (personsres, auctionsres, out : Collector[DawnData]) => { // my >>>>>>> cogroup function here that outputs the name of the person and the >>>>>>> number of auctions }}).setParallelism(1) >>>>>>> >>>>>>> This code works fine with parallelism set to 1 as above. My issue is >>>>>>> that if I raise the parallelism of the coGroup above 1 the data will get >>>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there >>>>>>> are non-empty auction iterators passed to the cogroup function where the >>>>>>> persons iterator is empty, but this is impossible because all buyers >>>>>>> exist >>>>>>> in the persons database! >>>>>>> >>>>>>> If anyone has some pointers for me why this code starts producing >>>>>>> strange results when parallelism is set above 1 this would be greatly >>>>>>> appreciated :-) >>>>>>> >>>>>>> Kind regards. >>>>>>> >>>>>>> Pieter Hameete >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >