This sound like a problem with your custom type and its (presumably) custom serializers and comparators.
I assume it is not an issue of partitioning or sorting because Reduce is working fine, as you reported. CoGroup does also partition and sort data, but compares the elements of two sorted streams. I would check the following methods: - extractKeys - getFlatComparators - duplicate (duplicate must return a deep copy, esp. of all nested comparators) Feel free to share your custom TypeInfo, Comparator, and Serializers. Cheers, Fabian 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>: > Hi Pieter, > > your code doesn't look suspicious at the first glance. Would it be > possible for you to post a complete example with data (also possible to > include it in the code) to reproduce your problem? > > Cheers, > Till > > On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> > wrote: > >> Dear fellow Flinkers, >> >> I am implementing queries from the XMark ( >> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom >> nested data type. Reading the XML data generated by the XMark generator >> into my custom nested datatype works perfectly, and the queries that I have >> implemented so far using mostly map, reduce and filter produce correct >> results. >> >> For the next query I wish to cogroup a dataset containing person data >> with a dataset containing auction data, joined by the *personid *of the >> person and the *personid *of the buyer of an auction, so that I can >> count the number of purchases of a person. I select this *personid *as >> key from the custom nested data type in the *where* and *equalTo *functions >> of the *coGroup*. The XML2DawnInputFormat is my custom input format that >> reads XML into my custom nested datatype *DawnData*. The 'inputGraph' >> and 'auctionInput' are a projection on the XML input to prevent reading >> unnecessary data. >> >> def env = ExecutionEnvironment.*getExecutionEnvironment >> *def persons : DataSet[DawnData] = env.readFile(new >> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = >> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = >> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) >> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( >> (personsres, auctionsres, out : Collector[DawnData]) => { // my cogroup >> function here that outputs the name of the person and the number of auctions >> }}).setParallelism(1) >> >> This code works fine with parallelism set to 1 as above. My issue is that >> if I raise the parallelism of the coGroup above 1 the data will get mixed >> up. Often the auctions Iterator will be empty, and sometimes there are >> non-empty auction iterators passed to the cogroup function where the >> persons iterator is empty, but this is impossible because all buyers exist >> in the persons database! >> >> If anyone has some pointers for me why this code starts producing strange >> results when parallelism is set above 1 this would be greatly appreciated >> :-) >> >> Kind regards. >> >> Pieter Hameete >> >> >> >