Would you mind opening a JIRA for this issue?

-> https://issues.apache.org/jira/browse/FLINK

I can do it as well, but you know all the details.

Thanks, Fabian

2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@neofonie.de>:

>  I want to add that I run the Flink job on a cluster with 13 machines and
> each machine has 13 processing slots which results in a total number of
> processing slots of 169.
>
>
> Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
>
> Correct.
>
> I also counted the rows with Spark and Hive. Both returned the same value
> which is nearly 100 mio. rows. But Flink returns 102 mio. rows.
>
> Best Regards,
> Hilmi
>
> Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
>
>   OK, so the problem seems to be with the HBase InputFormat.
>
>  I guess this issue needs a bit of debugging.
> We need to check if records are emitted twice (or more often) and if that
> is the case which records.
>  Unfortunately, this issue only seems to occur with large tables :-(
>
>  Did I got that right, that the HBase format returns about 2M (~2%) more
> records than are contained in the HBase table?
>
>  Cheers, Fabian
>
> 2015-06-09 10:34 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@neofonie.de>:
>
>>  Hi,
>> Now I tested the "count" method. It returns the same result as the
>> flatmap.groupBy(0).sum(1) method.
>>
>> Furthermore, the Hbase contains nearly 100 mio. rows but the result is
>> 102 mio.. This means that the HbaseInput reads more rows than the HBase
>> contains.
>>
>> Best Regards,
>> Hilmi
>>
>>
>> Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
>>
>>     Hi Hilmi,
>>
>>  I see two possible reasons:
>>
>>  1) The data source / InputFormat is not properly working, so not all
>> HBase records are read/forwarded, or
>>  2) The aggregation / count is buggy
>>
>>  Roberts suggestion will use an alternative mechanism to do the count. In
>> fact, you can count with groupBy(0).sum() and accumulators at the same time.
>>  If both counts are the same, this will indicate that the aggregation is
>> correct and hint that the HBase format is faulty.
>>
>> In any case, it would be very good to know your findings. Please keep us
>> updated.
>>
>>  One more hint, if you want to do a full aggregate, you don't have to use
>> a "dummy" key like "a". Instead, you can work with Tuple1<Long> and
>> directly call sum(0) without doing the groupBy().
>>
>>  Best, Fabian
>>
>> 2015-06-08 17:36 GMT+02:00 Robert Metzger <rmetz...@apache.org>:
>>
>>> Hi Hilmi,
>>>
>>>  if you just want to count the number of elements, you can also use
>>> accumulators, as described here [1].
>>> They are much more lightweight.
>>>
>>>  So you need to make your flatMap function a RichFlatMapFunction, then
>>> call getExecutionContext().
>>> Use a long accumulator to count the elements.
>>>
>>>  If the results with the accumulator are consistent (the exact element
>>> count), then there is a severe bug in Flink. But I suspect that the
>>> accumulator will give you the same result (off by +-5)
>>>
>>>  Best,
>>> Robert
>>>
>>>
>>>  [1]: http://slideshare.net/robertmetzger1/apache-flink-hands-on
>>>
>>> On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim <
>>> hilmi.yildi...@neofonie.de> wrote:
>>>
>>>> Hi,
>>>> I implemented a simple Flink Batch job which reads from an HBase
>>>> Cluster of 13 machines and with nearly 100 million rows. The hbase version
>>>> is 1.0.0-cdh5.4.1. So, I imported hbase-client 1.0.0-cdh5.4.1.
>>>> I implemented a flatmap which creates a tuple ("a", 1L) for each row .
>>>> Then, I use groupBy(0).sum(1).writeAsTest. The result should be the number
>>>> of rows. But, the result is not correct. I run the job multiple times and
>>>> the result flactuates by +-5. I also run the job for a smaller table with
>>>> 100.000 rows and the result is correct.
>>>>
>>>> Does anyone know the reason for that?
>>>>
>>>> Best Regards,
>>>> Hilmi
>>>>
>>>> --
>>>> --
>>>> Hilmi Yildirim
>>>> Software Developer R&D
>>>>
>>>> http://www.neofonie.de
>>>>
>>>> Besuchen Sie den Neo Tech Blog für Anwender:
>>>> http://blog.neofonie.de/
>>>>
>>>> Folgen Sie uns:
>>>> https://plus.google.com/+neofonie
>>>> http://www.linkedin.com/company/neofonie-gmbh
>>>> https://www.xing.com/companies/neofoniegmbh
>>>>
>>>> Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
>>>> Handelsregister Berlin-Charlottenburg: HRB 67460
>>>> Geschäftsführung: Thomas Kitlitschko
>>>>
>>>>
>>>
>>
>

Reply via email to