Dear fellow Flinkers,

I am implementing queries from the XMark (
http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom
nested data type. Reading the XML data generated by the XMark generator
into my custom nested datatype works perfectly, and the queries that I have
implemented so far using mostly map, reduce and filter produce correct
results.

For the next query I wish to cogroup a dataset containing person data with
a dataset containing auction data, joined by the *personid *of the person
and the *personid *of the buyer of an auction, so that I can count the
number of purchases of a person. I select this *personid *as key from the
custom nested data type in the *where* and *equalTo *functions of the
*coGroup*. The XML2DawnInputFormat is my custom input format that reads XML
into my custom nested datatype *DawnData*. The 'inputGraph' and
'auctionInput' are a projection on the XML input to prevent reading
unnecessary data.

def env = ExecutionEnvironment.*getExecutionEnvironment
*def persons : DataSet[DawnData] = env.readFile(new
XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData]
= env.readFile(new XML2DawnInputFormat(auctionInput), path)def result
= persons.coGroup(auctions).where(person => { person.select("2/@id/2")
}) .equalTo( auction => { auction.select("2/buyer/@person/2") })
.apply( (personsres, auctionsres, out : Collector[DawnData]) => {   //
my cogroup function here that outputs the name of the person and the
number of auctions  }}).setParallelism(1)

This code works fine with parallelism set to 1 as above. My issue is that
if I raise the parallelism of the coGroup above 1 the data will get mixed
up. Often the auctions Iterator will be empty, and sometimes there are
non-empty auction iterators passed to the cogroup function where the
persons iterator is empty, but this is impossible because all buyers exist
in the persons database!

If anyone has some pointers for me why this code starts producing strange
results when parallelism is set above 1 this would be greatly appreciated
:-)

Kind regards.

Pieter Hameete

Reply via email to