Hey Sebastian,

I've fixed the issue in this branch:
https://github.com/rmetzger/flink/tree/flink1589:

Configuration c = new Configuration();
c.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(c);


I'll also backport the fix to the release-0.8 branch to make it available
in the 0.8.2 release.

Maybe you can easily cherry-pick the commit to your 0.8.1 Flink build.


Best,
Robert

On Fri, Feb 20, 2015 at 12:11 PM, Robert Metzger <rmetz...@apache.org>
wrote:

> Hi Sebastian,
>
> Looks like you've found a limitation of Flink.
> I've already filed two JIRAs to resolve the issue (
> https://issues.apache.org/jira/browse/FLINK-1588,
> https://issues.apache.org/jira/browse/FLINK-1589).
>
> I don't know your setup, when you use Flink just as a dependency without a
> version being checked out, there is probably no way right now to use change
> the configuration settings.
> Then, you have to start yourself a local cluster (./bin/start-local.sh (+
> your settings in conf/flink-conf.yaml)). You can then either submit your
> job with ./bin/flink or using the
> RemoteExecutionEnvironment (ExecutionEnvironment.createRemoteEnvironment()).
>
> If you have the Flink source checked out, you can also hard-code the
> configuration values into org.apache.flink.client.LocalExecutor.
>
>
> By the way, Flink 0.8.1 is now available on maven central (I suspect you
> had to build it yourself yesterday evening).
> But given these issues here, it doesn't matter for you anymore ;)
>
>
> Best,
> Robert
>
>
>
> On Fri, Feb 20, 2015 at 11:48 AM, Sebastian <ssc.o...@googlemail.com>
> wrote:
>
>> I'm running flink from my IDE, how do change this setting in that context?
>>
>>
>> On 20.02.2015 11:41, Fabian Hueske wrote:
>>
>>> Have you tried to increase the heap size by shrinking the TM-managed
>>> memory?
>>>
>>> Reduce the fraction (taskmanager.memory.fraction) or fix the amount of
>>> TM memory (taskmanager.memory.size) in the flink-config.yaml [1].
>>>
>>> Cheers, Fabian
>>>
>>> [1] http://flink.apache.org/docs/0.8/config.html
>>>
>>>
>>>  On 20 Feb 2015, at 11:30, Sebastian <ssc.o...@googlemail.com> wrote:
>>>>
>>>> Hi,
>>>>
>>>> I get a strange out of memory error from the serialization code when I
>>>> try to run the following program:
>>>>
>>>> def compute(trackingGraphFile: String, domainIndexFile: String,
>>>>   outputPath: String) = {
>>>>
>>>> implicit val env = ExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> val edges = GraphUtils.readEdges(trackingGraphFile)
>>>> val domains = GraphUtils.readVertices(domainIndexFile)
>>>>
>>>> val domainsByCompany = DomainsByCompany.mapping
>>>> val companyEdges = edges.filter { edge =>
>>>>     domainsByCompany.contains(edge.src.toInt) }
>>>>   .map { edge => domainsByCompany(edge.src.toInt) -> edge.target.toInt
>>>> }
>>>>   .distinct
>>>>
>>>> val companyBitMaps = companyEdges.groupBy(0).reduceGroup {
>>>>     domainsByCompany: Iterator[(String,Int)] =>
>>>>
>>>>     var company = ""
>>>>     val seenAt = new util.BitSet(42889800)
>>>>
>>>>     for ((name, domain) <- domainsByCompany) {
>>>>       company = name
>>>>       seenAt.set(domain)
>>>>     }
>>>>
>>>>     company -> seenAt
>>>>   }
>>>>
>>>>   companyBitMaps.print()
>>>>
>>>>   env.execute()
>>>>
>>>> }
>>>>
>>>>
>>>> The error looks as follows:
>>>>
>>>>
>>>> 2015-02-20 11:22:54 INFO  JobClient:345 - java.lang.OutOfMemoryError:
>>>> Java heap space
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> DataOutputSerializer.resize(DataOutputSerializer.java:249)
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> DataOutputSerializer.write(DataOutputSerializer.java:93)
>>>>         at org.apache.flink.api.java.typeutils.runtime.
>>>> DataOutputViewStream.write(DataOutputViewStream.java:39)
>>>>         at com.esotericsoftware.kryo.io.Output.flush(Output.java:163)
>>>>         at com.esotericsoftware.kryo.io.Output.require(Output.java:142)
>>>>         at com.esotericsoftware.kryo.io.Output.writeBoolean(Output.
>>>> java:613)
>>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>>> BitSetSerializer.java:42)
>>>>         at com.twitter.chill.java.BitSetSerializer.write(
>>>> BitSetSerializer.java:29)
>>>>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.
>>>> java:599)
>>>>         at org.apache.flink.api.java.typeutils.runtime.
>>>> KryoSerializer.serialize(KryoSerializer.java:155)
>>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>>> serialize(CaseClassSerializer.scala:91)
>>>>         at org.apache.flink.api.scala.typeutils.CaseClassSerializer.
>>>> serialize(CaseClassSerializer.scala:30)
>>>>         at org.apache.flink.runtime.plugable.
>>>> SerializationDelegate.write(SerializationDelegate.java:51)
>>>>         at org.apache.flink.runtime.io.network.serialization.
>>>> SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:76)
>>>>         at org.apache.flink.runtime.io.network.api.RecordWriter.emit(
>>>> RecordWriter.java:82)
>>>>         at org.apache.flink.runtime.operators.shipping.
>>>> OutputCollector.collect(OutputCollector.java:88)
>>>>         at org.apache.flink.api.scala.GroupedDataSet$$anon$2.reduce(
>>>> GroupedDataSet.scala:262)
>>>>         at org.apache.flink.runtime.operators.GroupReduceDriver.
>>>> run(GroupReduceDriver.java:124)
>>>>         at org.apache.flink.runtime.operators.RegularPactTask.run(
>>>> RegularPactTask.java:493)
>>>>         at org.apache.flink.runtime.operators.RegularPactTask.
>>>> invoke(RegularPactTask.java:360)
>>>>         at org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>> run(RuntimeEnvironment.java:257)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> I run the job locally, giving 2GB of Ram to the VM. The code will
>>>> produce less than 10 groups and the bitsets used internally should not be
>>>> larger than a few megabytes.
>>>>
>>>> Any tips on how to fix this?
>>>>
>>>> Best,
>>>> Sebastian
>>>>
>>>> PS: Still waiting for a reduceGroup that gives me the key ;)
>>>>
>>>>
>>>>
>>>>
>>>
>

Reply via email to