Hi Josh, thank you for your quick answer! 2016-11-03 17:03 GMT+01:00 Josh Elser <els...@apache.org>:
> Hi Oliver, > > Cool stuff. I wish I knew more about Flink to make some better > suggestions. Some points inline, and sorry in advance if I suggest > something outright wrong. Hopefully someone from the Flink side can help > give context where necessary :) > > Oliver Swoboda wrote: > >> Hello, >> >> I'm using Flink with Accumulo and wanted to read data from the database >> by using the createHadoopInput function. Therefore I configure an >> AccumuloInputFormat. The source code you can find here: >> https://github.com/OSwoboda/masterthesis/blob/master/aggrega >> tion.flink/src/main/java/de/oswoboda/aggregation/Main.java >> <https://github.com/OSwoboda/masterthesis/blob/master/aggreg >> ation.flink/src/main/java/de/oswoboda/aggregation/Main.java> >> >> I'm using a 5 Node Cluster (1 Master, 4 Worker). >> Accumulo is installed with Ambari and has 1 Master Server on the Master >> Node and 4 Tablet Servers (one on each Worker). >> Flink is installed standalone with the Jobmanager on the Master Node and >> 4 Taskmanagers (one on each Worker). Every Taskmanager can have 4 Tasks, >> so there are 32 in total. >> >> First problem I have: >> If I start serveral Flink Jobs the client count for Zookeeper in the >> Accumulo Overview is constantly increasing. I assume that the used >> scanner isn't correctly closed. The client count only decreases to >> normal values when I restart Flink. >> > > Hrm, this does seem rather bad. Eventually, you'll saturate the > connections to ZK and ZK itself will start limiting new connections (per > the maxClientCnxns property). > > This sounds somewhat familiar to https://issues.apache.org/jira > /browse/ACCUMULO-2113. The lack of a proper "close()" method on the > Instance interface is a known deficiency. I'm not sure how Flink execution > happens, so I am kind of just guessing. > > You might be able to try to use the CleanUp[1] utility to close out the > thread pools/connections when your Flink "task" is done. Unfortunately that didn't worked. I guess because Flink is starting the tasks with the scanners by a TaskManager and I can't access those tasks with my program. So after the task is done, I can't close the connections with the utility, because the thread where I use it hasn't startet the scanners. Second problem I have: >> I want to compare aggregations on time series data with Accumulo (with >> Iterators) and with flink. Unfortunately, the results vary inexplicable >> when I'm using Flink. I wanted to compare the results for a full table >> scan (called baseline in the code), but sometimes it takes 17-18 minutes >> and sometimes its between 30 and 60 minutes. In the longer case I can >> see in the Accumulo Overview that after some time only one worker is >> left with running scans and there are just a few entries/s sanned (4 >> million at the beginning when all workers are running to 200k when the >> one worker is left). Because there are 2.5 billion records to scan and >> almost 500 million left it takes really long. >> This problem doesn't occur with Accumulo using Iterators and a batch >> scanner on the master node, each scan has almost identical durations and >> graphics in the Accumulo Overview for entries/s, MB/s scanned and seeks >> are for each scan the same. >> > > It sounds like maybe your partitioning was sub-optimal and caused one task > to get a majority of the data? Having the autoAdjustRanges=true (as you do > by default) should help get many batches of work based on the tablet > boundaries in Accumulo. I'm not sure how Flink actually executes them > though. > The problem was that half of the data was on one node after a restart of accumulo. It seems that it has something to do with the problem described here: https://issues.apache.org/jira/browse/ACCUMULO-4353. I stopped and then startet accumulo instead of doing a restart and then the data is distributed evenly across all nodes. For my tests I keep accumulo running now, because after each restart the data distribution is changed and I don't want to upgrade to 1.8. Yours faithfully, >> Oliver Swoboda >> > > > [1] https://github.com/apache/accumulo/blob/e900e67425d950bd4c0c > 5288a6270d7b362ac458/core/src/main/java/org/apache/accumulo/ > core/util/CleanUp.java#L36 > >