Oliver Swoboda wrote:
Hi Josh, thank you for your quick answer!

2016-11-03 17:03 GMT+01:00 Josh Elser <els...@apache.org
<mailto:els...@apache.org>>:

    Hi Oliver,

    Cool stuff. I wish I knew more about Flink to make some better
    suggestions. Some points inline, and sorry in advance if I suggest
    something outright wrong. Hopefully someone from the Flink side can
    help give context where necessary :)

    Oliver Swoboda wrote:

        Hello,

        I'm using Flink with Accumulo and wanted to read data from the
        database
        by using the createHadoopInput function. Therefore I configure an
        AccumuloInputFormat. The source code you can find here:
        
https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
        
<https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>
        
<https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
        
<https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>>

        I'm using a 5 Node Cluster (1 Master, 4 Worker).
        Accumulo is installed with Ambari and has 1 Master Server on the
        Master
        Node and 4 Tablet Servers (one on each Worker).
        Flink is installed standalone with the Jobmanager on the Master
        Node and
        4 Taskmanagers (one on each Worker). Every Taskmanager can have
        4 Tasks,
        so there are 32 in total.

        First problem I have:
        If I start serveral Flink Jobs the client count for Zookeeper in the
        Accumulo Overview is constantly increasing. I assume that the used
        scanner isn't correctly closed. The client count only decreases to
        normal values when I restart Flink.


    Hrm, this does seem rather bad. Eventually, you'll saturate the
    connections to ZK and ZK itself will start limiting new connections
    (per the maxClientCnxns property).

    This sounds somewhat familiar to
    https://issues.apache.org/jira/browse/ACCUMULO-2113
    <https://issues.apache.org/jira/browse/ACCUMULO-2113>. The lack of a
    proper "close()" method on the Instance interface is a known
    deficiency. I'm not sure how Flink execution happens, so I am kind
    of just guessing.

    You might be able to try to use the CleanUp[1] utility to close out
    the thread pools/connections when your Flink "task" is done.


Unfortunately that didn't worked. I guess because Flink is starting the
tasks with the scanners by a TaskManager and I can't access those tasks
with my program. So after the task is done, I can't close the
connections with the utility, because the thread where I use it hasn't
startet the scanners.

I see. I'm sorry, but I just don't have any other suggestion to give you. I'm not familiar with Flink's execution model. Good luck.

        Second problem I have:
        I want to compare aggregations on time series data with Accumulo
        (with
        Iterators) and with flink. Unfortunately, the results vary
        inexplicable
        when I'm using Flink. I wanted to compare the results for a full
        table
        scan (called baseline in the code), but sometimes it takes 17-18
        minutes
        and sometimes its between 30 and 60 minutes. In the longer case
        I can
        see in the Accumulo Overview that after some time only one worker is
        left with running scans and there are just a few entries/s sanned (4
        million at the beginning when all workers are running to 200k
        when the
        one worker is left). Because there are 2.5 billion records to
        scan and
        almost 500 million left it takes really long.
        This problem doesn't occur with Accumulo using Iterators and a batch
        scanner on the master node, each scan has almost identical
        durations and
        graphics in the Accumulo Overview for entries/s, MB/s scanned
        and seeks
        are for each scan the same.


    It sounds like maybe your partitioning was sub-optimal and caused
    one task to get a majority of the data? Having the
    autoAdjustRanges=true (as you do by default) should help get many
    batches of work based on the tablet boundaries in Accumulo. I'm not
    sure how Flink actually executes them though.


The problem was that half of the data was on one node after a restart of
accumulo. It seems that it has something to do with the problem
described here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I
stopped and then startet accumulo instead of doing a restart and then
the data is distributed evenly across all nodes. For my tests I keep
accumulo running now, because after each restart the data distribution
is changed and I don't want to upgrade to 1.8.

Your tablet distribution was not even? The Master should be automatically managing this for you, but this will not happen if there are offline tablets that are not getting assigned. Looking in the Master log for messages about the balancer should help you here. You do not need to upgrade to 1.8.0 to get an even distribution of data across all nodes in Accumulo.

One trick might be to restart the Accumulo master if everything appears to be OK otherwise.

        Yours faithfully,
        Oliver Swoboda



    [1]
    
https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36
    
<https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36>


Reply via email to