Done
https://issues.apache.org/jira/browse/FLINK-2188

Am 09.06.2015 um 11:26 schrieb Fabian Hueske:
Would you mind opening a JIRA for this issue?

-> https://issues.apache.org/jira/browse/FLINK

I can do it as well, but you know all the details.

Thanks, Fabian

2015-06-09 11:03 GMT+02:00 Hilmi Yildirim <hilmi.yildi...@neofonie.de <mailto:hilmi.yildi...@neofonie.de>>:

    I want to add that I run the Flink job on a cluster with 13
    machines and each machine has 13 processing slots which results in
    a total number of processing slots of 169.


    Am 09.06.2015 um 10:59 schrieb Hilmi Yildirim:
    Correct.

    I also counted the rows with Spark and Hive. Both returned the
    same value which is nearly 100 mio. rows. But Flink returns 102
    mio. rows.

    Best Regards,
    Hilmi

    Am 09.06.2015 um 10:47 schrieb Fabian Hueske:
    OK, so the problem seems to be with the HBase InputFormat.

    I guess this issue needs a bit of debugging.
    We need to check if records are emitted twice (or more often)
    and if that is the case which records.
    Unfortunately, this issue only seems to occur with large tables :-(

    Did I got that right, that the HBase format returns about 2M
    (~2%) more records than are contained in the HBase table?

    Cheers, Fabian

    2015-06-09 10:34 GMT+02:00 Hilmi Yildirim
    <hilmi.yildi...@neofonie.de <mailto:hilmi.yildi...@neofonie.de>>:

        Hi,
        Now I tested the "count" method. It returns the same result
        as the flatmap.groupBy(0).sum(1) method.

        Furthermore, the Hbase contains nearly 100 mio. rows but the
        result is 102 mio.. This means that the HbaseInput reads
        more rows than the HBase contains.

        Best Regards,
        Hilmi


        Am 08.06.2015 um 23:29 schrieb Fabian Hueske:
        Hi Hilmi,

        I see two possible reasons:

        1) The data source / InputFormat is not properly working,
        so not all HBase records are read/forwarded, or
        2) The aggregation / count is buggy

        Roberts suggestion will use an alternative mechanism to do
        the count. In fact, you can count with groupBy(0).sum() and
        accumulators at the same time.
        If both counts are the same, this will indicate that the
        aggregation is correct and hint that the HBase format is
        faulty.

        In any case, it would be very good to know your findings.
        Please keep us updated.

        One more hint, if you want to do a full aggregate, you
        don't have to use a "dummy" key like "a". Instead, you can
        work with Tuple1<Long> and directly call sum(0) without
        doing the groupBy().

        Best, Fabian

        2015-06-08 17:36 GMT+02:00 Robert Metzger
        <rmetz...@apache.org <mailto:rmetz...@apache.org>>:

            Hi Hilmi,

            if you just want to count the number of elements, you
            can also use accumulators, as described here [1].
            They are much more lightweight.

            So you need to make your flatMap function a
            RichFlatMapFunction, then call getExecutionContext().
            Use a long accumulator to count the elements.

            If the results with the accumulator are consistent (the
            exact element count), then there is a severe bug in
            Flink. But I suspect that the accumulator will give you
            the same result (off by +-5)

            Best,
            Robert


            [1]:
            http://slideshare.net/robertmetzger1/apache-flink-hands-on

            On Mon, Jun 8, 2015 at 3:04 PM, Hilmi Yildirim
            <hilmi.yildi...@neofonie.de
            <mailto:hilmi.yildi...@neofonie.de>> wrote:

                Hi,
                I implemented a simple Flink Batch job which reads
                from an HBase Cluster of 13 machines and with
                nearly 100 million rows. The hbase version is
                1.0.0-cdh5.4.1. So, I imported hbase-client
                1.0.0-cdh5.4.1.
                I implemented a flatmap which creates a tuple ("a",
                1L) for each row . Then, I use
                groupBy(0).sum(1).writeAsTest. The result should be
                the number of rows. But, the result is not correct.
                I run the job multiple times and the result
                flactuates by +-5. I also run the job for a smaller
                table with 100.000 rows and the result is correct.

                Does anyone know the reason for that?

                Best Regards,
                Hilmi

-- --
                Hilmi Yildirim
                Software Developer R&D

                http://www.neofonie.de

                Besuchen Sie den Neo Tech Blog für Anwender:
                http://blog.neofonie.de/

                Folgen Sie uns:
                https://plus.google.com/+neofonie
                http://www.linkedin.com/company/neofonie-gmbh
                https://www.xing.com/companies/neofoniegmbh

                Neofonie GmbH | Robert-Koch-Platz 4 | 10115 Berlin
                Handelsregister Berlin-Charlottenburg: HRB 67460
                Geschäftsführung: Thomas Kitlitschko





Reply via email to