Cool!

Always happy to help :-)

2015-09-16 14:41 GMT+02:00 Pieter Hameete <phame...@gmail.com>:

> Fantastic Fabian, that was it :-)! I'm glad it wasn't a more severe/tricky
> programming error though I already spent quite some time wondering about
> this one.
>
> Have a nice day!
>
> - Pieter
>
>
>
> 2015-09-16 14:27 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>
>> Sorry, I was thinking too complicated. Forget about the methods I
>> mentioned.
>>
>> If you are implementing WritableComparable types, you need to override
>> the hashcode() method.
>> Flink treats WritableComparable types just like Hadoop [1].
>> DawnData does not implement hashcode() which causes inconsistent hash
>> partitioning.
>>
>> Please let me know, if that solved your problem.
>>
>> Cheers, Fabian
>>
>> [1]
>> https://squarecog.wordpress.com/2011/02/20/hadoop-requires-stable-hashcode-implementations/
>>
>> 2015-09-16 14:12 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>>
>>> Hi,
>>>
>>> I havent been able to find the problem yet, and I dont know exactly how
>>> to check the methods you suggested to check earlier (extractKeys,
>>> getFlatComparators, duplicate) for the Scala API. Do you have some pointers
>>> for me on how I can check these myself?
>>>
>>> In my earlier mail I stated that maps, filters and reduces work fine. I
>>> found that this was not correct: for my previous queries I have only used
>>> maps and filters. I made an extra test and found that indeed the following
>>> code using a reduce also generates faulty results when increasing
>>> paralellism past 1:
>>>
>>> def auctions : DataSet[DawnData] = env.readFile(new 
>>> XML2DawnInputFormat(auctionInput), path)
>>> def test = 
>>> auctions.groupBy(_.select("2.buyer.@person").getFirst).reduceGroup( 
>>> (groupedauctions, out : Collector[DawnData]) => {
>>>   out.collect(new DawnData(groupedauctions.size))
>>> }).setParallelism(1)
>>> test.print
>>>
>>> Does this indicate that something else could be wrong with the custom
>>> datatype?
>>>
>>> You can find the corresponding code and a small dataset at
>>> https://github.com/PHameete/dawn-flink in the *development* branch. It
>>> is a Scala Maven project so you should be able to run the
>>> *main.scala.wis.dawnflink.performance.DawnBenchmarkSuite* class out of
>>> the box to run the query from my first email. In this class you can also
>>> change the query thats being run or run multiple queries. If this does not
>>> work please let me know!
>>>
>>> Kind regards and cheers again!
>>>
>>> - Pieter
>>>
>>>
>>>
>>>
>>> 2015-09-16 11:24 GMT+02:00 Pieter Hameete <phame...@gmail.com>:
>>>
>>>> Cheers Till and Fabian for your fast replies, it's much appreciated!
>>>>
>>>> I figured something should be wrong with my data type. I have no doubt
>>>> the CoGroup works just fine :-) Its pointers what to investigate about my
>>>> datatype what I am looking for. Initially I had problems with serialization
>>>> causing strange issues as well, these were resolved after I had rewritten
>>>> my serialization so I believe that is working OK.
>>>>
>>>> I'll try looking into the data type some more with your tips. If I cant
>>>> figure it out i'll share the repository with you later today or tomorrow.
>>>>
>>>> Kind regards,
>>>>
>>>> Pieter
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2015-09-16 11:02 GMT+02:00 Fabian Hueske <fhue...@gmail.com>:
>>>>
>>>>> This sound like a problem with your custom type and its (presumably)
>>>>> custom serializers and comparators.
>>>>>
>>>>> I assume it is not an issue of partitioning or sorting because Reduce
>>>>> is working fine, as you reported.
>>>>> CoGroup does also partition and sort data, but compares the elements
>>>>> of two sorted streams.
>>>>>
>>>>> I would check the following methods:
>>>>> - extractKeys
>>>>> - getFlatComparators
>>>>> - duplicate (duplicate must return a deep copy, esp. of all nested
>>>>> comparators)
>>>>>
>>>>> Feel free to share your custom TypeInfo, Comparator, and Serializers.
>>>>>
>>>>> Cheers, Fabian
>>>>>
>>>>> 2015-09-16 10:52 GMT+02:00 Till Rohrmann <till.rohrm...@gmail.com>:
>>>>>
>>>>>> Hi Pieter,
>>>>>>
>>>>>> your code doesn't look suspicious at the first glance. Would it be
>>>>>> possible for you to post a complete example with data (also possible to
>>>>>> include it in the code) to reproduce your problem?
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Wed, Sep 16, 2015 at 10:31 AM, Pieter Hameete <phame...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Dear fellow Flinkers,
>>>>>>>
>>>>>>> I am implementing queries from the XMark (
>>>>>>> http://www.ins.cwi.nl/projects/xmark/) benchmark on Flink using a
>>>>>>> custom nested data type. Reading the XML data generated by the XMark
>>>>>>> generator into my custom nested datatype works perfectly, and the 
>>>>>>> queries
>>>>>>> that I have implemented so far using mostly map, reduce and filter 
>>>>>>> produce
>>>>>>> correct results.
>>>>>>>
>>>>>>> For the next query I wish to cogroup a dataset containing person
>>>>>>> data with a dataset containing auction data, joined by the *personid
>>>>>>> *of the person and the *personid *of the buyer of an auction, so
>>>>>>> that I can count the number of purchases of a person. I select this 
>>>>>>> *personid
>>>>>>> *as key from the custom nested data type in the *where* and *equalTo
>>>>>>> *functions of the *coGroup*. The XML2DawnInputFormat is my custom
>>>>>>> input format that reads XML into my custom nested datatype
>>>>>>> *DawnData*. The 'inputGraph' and 'auctionInput' are a projection on
>>>>>>> the XML input to prevent reading unnecessary data.
>>>>>>>
>>>>>>> def env = ExecutionEnvironment.*getExecutionEnvironment
>>>>>>> *def persons : DataSet[DawnData] = env.readFile(new 
>>>>>>> XML2DawnInputFormat(inputGraph), path)def auctions : DataSet[DawnData] 
>>>>>>> = env.readFile(new XML2DawnInputFormat(auctionInput), path)def result = 
>>>>>>> persons.coGroup(auctions).where(person => { person.select("2/@id/2") }) 
>>>>>>> .equalTo( auction => { auction.select("2/buyer/@person/2") }) .apply( 
>>>>>>> (personsres, auctionsres, out : Collector[DawnData]) => {   // my 
>>>>>>> cogroup function here that outputs the name of the person and the 
>>>>>>> number of auctions  }}).setParallelism(1)
>>>>>>>
>>>>>>> This code works fine with parallelism set to 1 as above. My issue is
>>>>>>> that if I raise the parallelism of the coGroup above 1 the data will get
>>>>>>> mixed up. Often the auctions Iterator will be empty, and sometimes there
>>>>>>> are non-empty auction iterators passed to the cogroup function where the
>>>>>>> persons iterator is empty, but this is impossible because all buyers 
>>>>>>> exist
>>>>>>> in the persons database!
>>>>>>>
>>>>>>> If anyone has some pointers for me why this code starts producing
>>>>>>> strange results when parallelism is set above 1 this would be greatly
>>>>>>> appreciated :-)
>>>>>>>
>>>>>>> Kind regards.
>>>>>>>
>>>>>>> Pieter Hameete
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to