Cheers Till and Fabian for your fast replies, it's much appreciated!

I figured something should be wrong with my data type. I have no doubt the
CoGroup works just fine :-) Its pointers what to investigate about my
datatype what I am looking for. Initially I had problems with serialization
causing strange issues as well, these were resolved after I had rewritten
my serialization so I believe that is working OK.

I'll try looking into the data type some more with your tips. If I cant
figure it out i'll share the repository with you later today or tomorrow.

Kind regards,

Pieter





2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:

> This sound like a problem with your custom type and its (presumably)
> custom serializers and comparators.
>
> I assume it is not an issue of partitioning or sorting because Reduce is
> working fine, as you reported.
> CoGroup does also partition and sort data, but compares the elements of
> two sorted streams.
>
> I would check the following methods:
> - extractKeys
> - getFlatComparators
> - duplicate (duplicate must return a deep copy, esp. of all nested
> comparators)
>
> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>
> Cheers, Fabian
>
> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
>
>> Hi Pieter,
>>
>> your code doesn't look suspicious at the first glance. Would it be
>> possible for you to post a complete example with data (also possible to
>> include it in the code) to reproduce your problem?
>>
>> Cheers,
>> Till
>>
>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com>
>> wrote:
>>
>>> Dear fellow Flinkers,
>>>
>>> I am implementing queries from the XMark (
>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>> custom nested data type. Reading the XML data generated by the XMark
>>> generator into my custom nested datatype works perfectly, and the queries
>>> that I have implemented so far using mostly map, reduce and filter produce
>>> correct results.
>>>
>>> For the next query I wish to cogroup a dataset containing person data
>>> with a dataset containing auction data, joined by the *personid *of the
>>> person and the *personid *of the buyer of an auction, so that I can
>>> count the number of purchases of a person. I select this *personid *as
>>> key from the custom nested data type in the *where* and *equalTo *functions
>>> of the *coGroup*. The XML2DawnInputFormat is my custom input format
>>> that reads XML into my custom nested datatype *DawnData*. The
>>> 'inputGraph' and 'auctionInput' are a projection on the XML input to
>>> prevent reading unnecessary data.
>>>
>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>> *def persons : DataSet[DawnData] = env.readFile(new 
>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] = 
>>> env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = 
>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) 
>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( 
>>> (personsres, auctionsres, out : Collector[DawnData]) => {   // my cogroup 
>>> function here that outputs the name of the person and the number of 
>>> auctions  }}).setParallelism(1)
>>>
>>> This code works fine with parallelism set to 1 as above. My issue is
>>> that if I raise the parallelism of the coGroup above 1 the data will get
>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>> are non-empty auction iterators passed to the cogroup function where the
>>> persons iterator is empty, but this is impossible because all buyers exist
>>> in the persons database!
>>>
>>> If anyone has some pointers for me why this code starts producing
>>> strange results when parallelism is set above 1 this would be greatly
>>> appreciated :-)
>>>
>>> Kind regards.
>>>
>>> Pieter Hameete
>>>
>>>
>>>
>>
>

Reply via email to