Why the older version does not hit the limit and the newer version does is not 
quite clear, but it could just be expected resource usage differences between 
versions.

There were some changes how the network buffers are assigned. But my best guess is that it's because we changed the default parallelism from 1 to the number of available cores. That consumes more network buffers which exceeds the default.

I can see how differentiating between the two kinds of configs are important. 
If FLINK_CONF_DIR is a de facto standard for Flink that does seem like the 
right solution, but I think this could be better documented on the Flink runner 
page.

It should be transparent to the user because the Flink scripts set the environment variable. The FlinkPipielineOptions are for job-scoped settings. The local execution is an exception because it brings up a new cluster.

Thinking about it more, we might actually add a configuration option which can solely be used for local execution. Would unblock the problem you are seeing and also allow users to test their production config locally.

Thanks,
Max

On 13.01.19 05:57, Mike Pedersen wrote:
Hi Max.

Ah, that explains it. Great to see it already has been fixed.

Currently we are using a older version of Beam which does not run out of memory buffers. Why the older version does not hit the limit and the newer version does is not quite clear, but it could just be expected resource usage differences between versions. We can use that until the 2.10.0 release.

I can see how differentiating between the two kinds of configs are important. If FLINK_CONF_DIR is a de facto standard for Flink that does seem like the right solution, but I think this could be better documented on the Flink runner page.

Thanks a lot for the response,
Mike

Den søn. 13. jan. 2019 kl. 02.02 skrev Maximilian Michels <m...@apache.org <mailto:m...@apache.org>>:

    Hi Mike,

    Thank you for your message. What you have done is correct, but you have run
    into a bug which was present for local execution in 2.9.0. It has since been
    fixed for the upcoming 2.10.0 release.

    If you look at the 2.9.0 brach, you will see that the configuration is not
    passed to the local cluster:
    
https://github.com/apache/beam/blob/release-2.9.0/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L63

    It is confusing that the config is indeed loaded but it won't be passed to
    the local cluster.

    Regarding the environment variable you mentioned, this is how Flink picks up
    the configuration file. If you use the Flink CLI or scripts this will work
    fine. But keep in mind that the memory settings are only read upon cluster
    startup, so changing this value for a Beam job won't do anything to existing
    non-local clusters.

    We could add an option to FlinkPipelineOptions to allow arbitrary Flink
    options to be passed. The main reason why we hesitated doing that was to
    avoid confusion about the different types of configuration settings and
    their scope.

    Please let us know if you have further questions.

    Best,
    Max


    On January 9, 2019 8:27:36 AM EST, Mike Pedersen <m...@mikepedersen.dk
    <mailto:m...@mikepedersen.dk>> wrote:

        So I have Beam job that I want to run with Flink locally. Problem is, I
        get the following error:

         > java.io.IOException: Insufficient number of network buffers: required
        32, but only 24 available. The total number of network buffers is
        currently set to 32768 of 32768 bytes each. You can increase this number
        by setting the configuration keys 'taskmanager.network.memory.fraction',
        'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.

        So I create a config file with taskmanager.network.memory.max set to 5gb
        and taskmanager.network.memory.fraction set to 0.2. I also set the
        FLINK_CONF_DIR path to the dir with the config file (undocumented
        feature) and set the --flinkMaster path to "[local]" as it seems like
        the default "[auto]" ignores the config file:
        
https://github.com/apache/beam/blob/1e41220977d6c45d293b86f2e581daec3513c66e/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L76-L82.

        Now, it seems like configs are loaded ok. I get the following log
        message during start:

         > Jan 09, 2019 10:54:43 AM
        org.apache.flink.configuration.GlobalConfiguration loadYAMLResource
        INFO: Loading configuration property: taskmanager.network.memory.max, 
5gb

        But the error at the top of the post still appears. 32768 * 32768 bytes
        = 1gb, which is the default value of taskmanager.network.memory.max, so
        it seems like the config is ignored.

        Any ideas what might cause this problem? Am I adjusting the wrong
        parameter or something?

Reply via email to