Dear fellow Flinkers, I am implementing queries from the XMark ( http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom nested data type. Reading the XML data generated by the XMark generator into my custom nested datatype works perfectly, and the queries that I have implemented so far using mostly map, reduce and filter produce correct results.
For the next query I wish to cogroup a dataset containing person data with a dataset containing auction data, joined by the *personid *of the person and the *personid *of the buyer of an auction, so that I can count the number of purchases of a person. I select this *personid *as key from the custom nested data type in the *where* and *equalTo *functions of the *coGroup*. The XML2DawnInputFormat is my custom input format that reads XML into my custom nested datatype *DawnData*. The 'inputGraph' and 'auctionInput' are a projection on the XML input to prevent reading unnecessary data. def env = ExecutionEnvironment.*getExecutionEnvironment *def persons : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( (personsres, auctionsres, out : Collector[DawnData]) => { // my cogroup function here that outputs the name of the person and the number of auctions }}).setParallelism(1) This code works fine with parallelism set to 1 as above. My issue is that if I raise the parallelism of the coGroup above 1 the data will get mixed up. Often the auctions Iterator will be empty, and sometimes there are non-empty auction iterators passed to the cogroup function where the persons iterator is empty, but this is impossible because all buyers exist in the persons database! If anyone has some pointers for me why this code starts producing strange results when parallelism is set above 1 this would be greatly appreciated :-) Kind regards. Pieter Hameete