Hey Sebastian, I've fixed the issue in this branch: https://github.com/rmetzger/flink/tree/flink1589:
Configuration c = new Configuration(); c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f); final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c); I'll also backport the fix to the release-0.8 branch to make it available in the 0.8.2 release. Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build. Best, Robert On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org> wrote: > Hi Sebastian, > > Looks like you've found a limitation of Flink. > I've already filed two JIRAs to resolve the issue ( > https://issues.apache.org/jira/browse/FLINK-1588, > https://issues.apache.org/jira/browse/FLINK-1589). > > I don't know your setup, when you use Flink just as a dependency without a > version being checked out, there is probably no way right now to use change > the configuration settings. > Then, you have to start yourself a local cluster (./bin/start-local.sh (+ > your settings in conf/flink-conf.yaml)). You can then either submit your > job with ./bin/flink or using the > RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()). > > If you have the Flink source checked out, you can also hard-code the > configuration values into org.apache.flink.client.LocalExecutor. > > > By the way, Flink 0.8.1 is now available on maven central (I suspect you > had to build it yourself yesterday evening). > But given these issues here, it doesn't matter for you anymore ;) > > > Best, > Robert > > > > On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.o...@googlemail.com> > wrote: > >> I'm running flink from my IDE, how do change this setting in that context? >> >> >> On 20.02.2015 11:41, Fabian Hueske wrote: >> >>> Have you tried to increase the heap size by shrinking the TM-managed >>> memory? >>> >>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of >>> TM memory (taskmanager.memory.size) in the flink-config.yaml [1]. >>> >>> Cheers, Fabian >>> >>> [1] http://flink.apache.org/docs/0.8/config.html >>> >>> >>> On 20 Feb 2015, at 11:30, Sebastian <ssc.o...@googlemail.com> wrote: >>>> >>>> Hi, >>>> >>>> I get a strange out of memory error from the serialization code when I >>>> try to run the following program: >>>> >>>> def compute(trackingGraphFile: String, domainIndexFile: String, >>>> outputPath: String) = { >>>> >>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment >>>> >>>> val edges = GraphUtils.readEdges(trackingGraphFile) >>>> val domains = GraphUtils.readVertices(domainIndexFile) >>>> >>>> val domainsByCompany = DomainsByCompany.mapping >>>> val companyEdges = edges.filter { edge => >>>> domainsByCompany.contains(edge.src.toInt) } >>>> .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt >>>> } >>>> .distinct >>>> >>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup { >>>> domainsByCompany: Iterator[(String,Int)] => >>>> >>>> var company = "" >>>> val seenAt = new util.BitSet(42889800) >>>> >>>> for ((name, domain) <- domainsByCompany) { >>>> company = name >>>> seenAt.set(domain) >>>> } >>>> >>>> company -> seenAt >>>> } >>>> >>>> companyBitMaps.print() >>>> >>>> env.execute() >>>> >>>> } >>>> >>>> >>>> The error looks as follows: >>>> >>>> >>>> 2015-02-20 11:22:54 INFO JobClient:345 - java.lang.OutOfMemoryError: >>>> Java heap space >>>> at org.apache.flink.runtime.io.network.serialization. >>>> DataOutputSerializer.resize(DataOutputSerializer.java:249) >>>> at org.apache.flink.runtime.io.network.serialization. >>>> DataOutputSerializer.write(DataOutputSerializer.java:93) >>>> at org.apache.flink.api.java.typeutils.runtime. >>>> DataOutputViewStream.write(DataOutputViewStream.java:39) >>>> at com.esotericsoftware.kryo.io.Output.flush(Output.java:163) >>>> at com.esotericsoftware.kryo.io.Output.require(Output.java:142) >>>> at com.esotericsoftware.kryo.io.Output.writeBoolean(Output. >>>> java:613) >>>> at com.twitter.chill.java.BitSetSerializer.write( >>>> BitSetSerializer.java:42) >>>> at com.twitter.chill.java.BitSetSerializer.write( >>>> BitSetSerializer.java:29) >>>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo. >>>> java:599) >>>> at org.apache.flink.api.java.typeutils.runtime. >>>> KryoSerializer.serialize(KryoSerializer.java:155) >>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer. >>>> serialize(CaseClassSerializer.scala:91) >>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer. >>>> serialize(CaseClassSerializer.scala:30) >>>> at org.apache.flink.runtime.plugable. >>>> SerializationDelegate.write(SerializationDelegate.java:51) >>>> at org.apache.flink.runtime.io.network.serialization. >>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76) >>>> at org.apache.flink.runtime.io.network.api.RecordWriter.emit( >>>> RecordWriter.java:82) >>>> at org.apache.flink.runtime.operators.shipping. >>>> OutputCollector.collect(OutputCollector.java:88) >>>> at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce( >>>> GroupedDataSet.scala:262) >>>> at org.apache.flink.runtime.operators.GroupReduceDriver. >>>> run(GroupReduceDriver.java:124) >>>> at org.apache.flink.runtime.operators.RegularPactTask.run( >>>> RegularPactTask.java:493) >>>> at org.apache.flink.runtime.operators.RegularPactTask. >>>> invoke(RegularPactTask.java:360) >>>> at org.apache.flink.runtime.execution.RuntimeEnvironment. >>>> run(RuntimeEnvironment.java:257) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> I run the job locally, giving 2GB of Ram to the VM. The code will >>>> produce less than 10 groups and the bitsets used internally should not be >>>> larger than a few megabytes. >>>> >>>> Any tips on how to fix this? >>>> >>>> Best, >>>> Sebastian >>>> >>>> PS: Still waiting for a reduceGroup that gives me the key ;) >>>> >>>> >>>> >>>> >>> >