Sorry, I was thinking too complicated. Forget about the methods I mentioned.
If you are implementing WritableComparable types, you need to override the hashcode() method. Flink treats WritableComparable types just like Hadoop [1]. DawnData does not implement hashcode() which causes inconsistent hash partitioning. Please let me know, if that solved your problem. Cheers, Fabian [1] https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/ 2015-09-16 14:12 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > Hi, > > I havent been able to find the problem yet, and I dont know exactly how to > check the methods you suggested to check earlier (extractKeys, > getFlatComparators, duplicate) for the Scala API. Do you have some pointers > for me on how I can check these myself? > > In my earlier mail I stated that maps, filters and reduces work fine. I > found that this was not correct: for my previous queries I have only used > maps and filters. I made an extra test and found that indeed the following > code using a reduce also generates faulty results when increasing > paralellism past 1: > > def auctions : DataSet[DawnData] = env.readFile(new > XML2DawnInputFormat(auctionInput), path) > def test = > auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( > (groupedauctions, out : Collector[DawnData]) => { > out.collect(new DawnData(groupedauctions.size)) > }).setParallelism(1) > test.print > > Does this indicate that something else could be wrong with the custom > datatype? > > You can find the corresponding code and a small dataset at > https://github.com/PHameete/dawn-flink in the *development* branch. It is > a Scala Maven project so you should be able to run the > *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of > the box to run the query from my first email. In this class you can also > change the query thats being run or run multiple queries. If this does not > work please let me know! > > Kind regards and cheers again! > > - Pieter > > > > > 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phame...@gmail.com>: > >> Cheers Till and Fabian for your fast replies, it's much appreciated! >> >> I figured something should be wrong with my data type. I have no doubt >> the CoGroup works just fine :-) Its pointers what to investigate about my >> datatype what I am looking for. Initially I had problems with serialization >> causing strange issues as well, these were resolved after I had rewritten >> my serialization so I believe that is working OK. >> >> I'll try looking into the data type some more with your tips. If I cant >> figure it out i'll share the repository with you later today or tomorrow. >> >> Kind regards, >> >> Pieter >> >> >> >> >> >> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: >> >>> This sound like a problem with your custom type and its (presumably) >>> custom serializers and comparators. >>> >>> I assume it is not an issue of partitioning or sorting because Reduce is >>> working fine, as you reported. >>> CoGroup does also partition and sort data, but compares the elements of >>> two sorted streams. >>> >>> I would check the following methods: >>> - extractKeys >>> - getFlatComparators >>> - duplicate (duplicate must return a deep copy, esp. of all nested >>> comparators) >>> >>> Feel free to share your custom TypeInfo, Comparator, and Serializers. >>> >>> Cheers, Fabian >>> >>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: >>> >>>> Hi Pieter, >>>> >>>> your code doesn't look suspicious at the first glance. Would it be >>>> possible for you to post a complete example with data (also possible to >>>> include it in the code) to reproduce your problem? >>>> >>>> Cheers, >>>> Till >>>> >>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> >>>> wrote: >>>> >>>>> Dear fellow Flinkers, >>>>> >>>>> I am implementing queries from the XMark ( >>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a >>>>> custom nested data type. Reading the XML data generated by the XMark >>>>> generator into my custom nested datatype works perfectly, and the queries >>>>> that I have implemented so far using mostly map, reduce and filter produce >>>>> correct results. >>>>> >>>>> For the next query I wish to cogroup a dataset containing person data >>>>> with a dataset containing auction data, joined by the *personid *of >>>>> the person and the *personid *of the buyer of an auction, so that I >>>>> can count the number of purchases of a person. I select this *personid >>>>> *as key from the custom nested data type in the *where* and *equalTo >>>>> *functions >>>>> of the *coGroup*. The XML2DawnInputFormat is my custom input format >>>>> that reads XML into my custom nested datatype *DawnData*. The >>>>> 'inputGraph' and 'auctionInput' are a projection on the XML input to >>>>> prevent reading unnecessary data. >>>>> >>>>> def env = ExecutionEnvironment.*getExecutionEnvironment >>>>> *def persons : DataSet[DawnData] = env.readFile(new >>>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = >>>>> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = >>>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) >>>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( >>>>> (personsres, auctionsres, out : Collector[DawnData]) => { // my cogroup >>>>> function here that outputs the name of the person and the number of >>>>> auctions }}).setParallelism(1) >>>>> >>>>> This code works fine with parallelism set to 1 as above. My issue is >>>>> that if I raise the parallelism of the coGroup above 1 the data will get >>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there >>>>> are non-empty auction iterators passed to the cogroup function where the >>>>> persons iterator is empty, but this is impossible because all buyers exist >>>>> in the persons database! >>>>> >>>>> If anyone has some pointers for me why this code starts producing >>>>> strange results when parallelism is set above 1 this would be greatly >>>>> appreciated :-) >>>>> >>>>> Kind regards. >>>>> >>>>> Pieter Hameete >>>>> >>>>> >>>>> >>>> >>> >> >