Hi Pieter, your code doesn't look suspicious at the first glance. Would it be possible for you to post a complete example with data (also possible to include it in the code) to reproduce your problem?
Cheers, Till On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> wrote: > Dear fellow Flinkers, > > I am implementing queries from the XMark ( > http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom > nested data type. Reading the XML data generated by the XMark generator > into my custom nested datatype works perfectly, and the queries that I have > implemented so far using mostly map, reduce and filter produce correct > results. > > For the next query I wish to cogroup a dataset containing person data with > a dataset containing auction data, joined by the *personid *of the person > and the *personid *of the buyer of an auction, so that I can count the > number of purchases of a person. I select this *personid *as key from the > custom nested data type in the *where* and *equalTo *functions of the > *coGroup*. The XML2DawnInputFormat is my custom input format that reads > XML into my custom nested datatype *DawnData*. The 'inputGraph' and > 'auctionInput' are a projection on the XML input to prevent reading > unnecessary data. > > def env = ExecutionEnvironment.*getExecutionEnvironment > *def persons : DataSet[DawnData] = env.readFile(new > XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = > env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = > persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) > .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( > (personsres, auctionsres, out : Collector[DawnData]) => { // my cogroup > function here that outputs the name of the person and the number of auctions > }}).setParallelism(1) > > This code works fine with parallelism set to 1 as above. My issue is that > if I raise the parallelism of the coGroup above 1 the data will get mixed > up. Often the auctions Iterator will be empty, and sometimes there are > non-empty auction iterators passed to the cogroup function where the > persons iterator is empty, but this is impossible because all buyers exist > in the persons database! > > If anyone has some pointers for me why this code starts producing strange > results when parallelism is set above 1 this would be greatly appreciated > :-) > > Kind regards. > > Pieter Hameete > > >