Cheers Till and Fabian for your fast replies, it's much appreciated! I figured something should be wrong with my data type. I have no doubt the CoGroup works just fine :-) Its pointers what to investigate about my datatype what I am looking for. Initially I had problems with serialization causing strange issues as well, these were resolved after I had rewritten my serialization so I believe that is working OK.
I'll try looking into the data type some more with your tips. If I cant figure it out i'll share the repository with you later today or tomorrow. Kind regards, Pieter 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>: > This sound like a problem with your custom type and its (presumably) > custom serializers and comparators. > > I assume it is not an issue of partitioning or sorting because Reduce is > working fine, as you reported. > CoGroup does also partition and sort data, but compares the elements of > two sorted streams. > > I would check the following methods: > - extractKeys > - getFlatComparators > - duplicate (duplicate must return a deep copy, esp. of all nested > comparators) > > Feel free to share your custom TypeInfo, Comparator, and Serializers. > > Cheers, Fabian > > 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: > >> Hi Pieter, >> >> your code doesn't look suspicious at the first glance. Would it be >> possible for you to post a complete example with data (also possible to >> include it in the code) to reproduce your problem? >> >> Cheers, >> Till >> >> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> >> wrote: >> >>> Dear fellow Flinkers, >>> >>> I am implementing queries from the XMark ( >>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a >>> custom nested data type. Reading the XML data generated by the XMark >>> generator into my custom nested datatype works perfectly, and the queries >>> that I have implemented so far using mostly map, reduce and filter produce >>> correct results. >>> >>> For the next query I wish to cogroup a dataset containing person data >>> with a dataset containing auction data, joined by the *personid *of the >>> person and the *personid *of the buyer of an auction, so that I can >>> count the number of purchases of a person. I select this *personid *as >>> key from the custom nested data type in the *where* and *equalTo *functions >>> of the *coGroup*. The XML2DawnInputFormat is my custom input format >>> that reads XML into my custom nested datatype *DawnData*. The >>> 'inputGraph' and 'auctionInput' are a projection on the XML input to >>> prevent reading unnecessary data. >>> >>> def env = ExecutionEnvironment.*getExecutionEnvironment >>> *def persons : DataSet[DawnData] = env.readFile(new >>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = >>> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = >>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) >>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( >>> (personsres, auctionsres, out : Collector[DawnData]) => { // my cogroup >>> function here that outputs the name of the person and the number of >>> auctions }}).setParallelism(1) >>> >>> This code works fine with parallelism set to 1 as above. My issue is >>> that if I raise the parallelism of the coGroup above 1 the data will get >>> mixed up. Often the auctions Iterator will be empty, and sometimes there >>> are non-empty auction iterators passed to the cogroup function where the >>> persons iterator is empty, but this is impossible because all buyers exist >>> in the persons database! >>> >>> If anyone has some pointers for me why this code starts producing >>> strange results when parallelism is set above 1 this would be greatly >>> appreciated :-) >>> >>> Kind regards. >>> >>> Pieter Hameete >>> >>> >>> >> >