Hi Oliver,

Cool stuff. I wish I knew more about Flink to make some better suggestions. Some points inline, and sorry in advance if I suggest something outright wrong. Hopefully someone from the Flink side can help give context where necessary :)

Oliver Swoboda wrote:
Hello,

I'm using Flink with Accumulo and wanted to read data from the database
by using the createHadoopInput function. Therefore I configure an
AccumuloInputFormat. The source code you can find here:
https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java
<https://github.com/OSwoboda/masterthesis/blob/master/aggregation.flink/src/main/java/de/oswoboda/aggregation/Main.java>

I'm using a 5 Node Cluster (1 Master, 4 Worker).
Accumulo is installed with Ambari and has 1 Master Server on the Master
Node and 4 Tablet Servers (one on each Worker).
Flink is installed standalone with the Jobmanager on the Master Node and
4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks,
so there are 32 in total.

First problem I have:
If I start serveral Flink Jobs the client count for Zookeeper in the
Accumulo Overview is constantly increasing. I assume that the used
scanner isn't correctly closed. The client count only decreases to
normal values when I restart Flink.

Hrm, this does seem rather bad. Eventually, you'll saturate the connections to ZK and ZK itself will start limiting new connections (per the maxClientCnxns property).

This sounds somewhat familiar to https://issues.apache.org/jira/browse/ACCUMULO-2113. The lack of a proper "close()" method on the Instance interface is a known deficiency. I'm not sure how Flink execution happens, so I am kind of just guessing.

You might be able to try to use the CleanUp[1] utility to close out the thread pools/connections when your Flink "task" is done.

Second problem I have:
I want to compare aggregations on time series data with Accumulo (with
Iterators) and with flink. Unfortunately, the results vary inexplicable
when I'm using Flink. I wanted to compare the results for a full table
scan (called baseline in the code), but sometimes it takes 17-18 minutes
and sometimes its between 30 and 60 minutes. In the longer case I can
see in the Accumulo Overview that after some time only one worker is
left with running scans and there are just a few entries/s sanned (4
million at the beginning when all workers are running to 200k when the
one worker is left). Because there are 2.5 billion records to scan and
almost 500 million left it takes really long.
This problem doesn't occur with Accumulo using Iterators and a batch
scanner on the master node, each scan has almost identical durations and
graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks
are for each scan the same.

It sounds like maybe your partitioning was sub-optimal and caused one task to get a majority of the data? Having the autoAdjustRanges=true (as you do by default) should help get many batches of work based on the tablet boundaries in Accumulo. I'm not sure how Flink actually executes them though.

Yours faithfully,
Oliver Swoboda


[1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/core/util/CleanUp.java#L36

Reply via email to