Hi Pieter,

your code doesn't look suspicious at the first glance. Would it be possible
for you to post a complete example with data (also possible to include it
in the code) to reproduce your problem?

Cheers,
Till

On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com> wrote:

> Dear fellow Flinkers,
>
> I am implementing queries from the XMark (
> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a custom
> nested data type. Reading the XML data generated by the XMark generator
> into my custom nested datatype works perfectly, and the queries that I have
> implemented so far using mostly map, reduce and filter produce correct
> results.
>
> For the next query I wish to cogroup a dataset containing person data with
> a dataset containing auction data, joined by the *personid *of the person
> and the *personid *of the buyer of an auction, so that I can count the
> number of purchases of a person. I select this *personid *as key from the
> custom nested data type in the *where* and *equalTo *functions of the
> *coGroup*. The XML2DawnInputFormat is my custom input format that reads
> XML into my custom nested datatype *DawnData*. The 'inputGraph' and
> 'auctionInput' are a projection on the XML input to prevent reading
> unnecessary data.
>
> def env = ExecutionEnvironment.*getExecutionEnvironment
> *def persons : DataSet[DawnData] = env.readFile(new 
> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = 
> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = 
> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) 
> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( 
> (personsres, auctionsres, out : Collector[DawnData]) => {   // my cogroup 
> function here that outputs the name of the person and the number of auctions  
> }}).setParallelism(1)
>
> This code works fine with parallelism set to 1 as above. My issue is that
> if I raise the parallelism of the coGroup above 1 the data will get mixed
> up. Often the auctions Iterator will be empty, and sometimes there are
> non-empty auction iterators passed to the cogroup function where the
> persons iterator is empty, but this is impossible because all buyers exist
> in the persons database!
>
> If anyone has some pointers for me why this code starts producing strange
> results when parallelism is set above 1 this would be greatly appreciated
> :-)
>
> Kind regards.
>
> Pieter Hameete
>
>
>

Reply via email to