I have a pyflink job that starts using the Datastream api and converts to
the Table API. In the datastream portion, there is a MapFunction. I am
getting the following error:

flink run -py sample.py

java.lang.IllegalArgumentException: The configured managed memory fraction
for Python worker process must be within (0, 1], was: %s. It may be because
the consumer type "Python" was missing or set to 0 for the config option
"taskmanager.memory.managed.consumer-weights".0.0
    at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:138)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at
org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:237)
~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
    at
org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.open(AbstractPythonFunctionOperator.java:113)
~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
    at
org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionOperator.open(OneInputPythonFunctionOperator.java:107)
~[blob_p-96a992be07a80c2f2104b54a9c2509bd6cf8e8bc-845e415af5e71e7433bf0c5cd5936782:1.12.1]
    at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:426)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565)
~[flink-dist_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.1.jar:1.12.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.1.jar:1.12.1]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]

I have tried setting the configuration setting mentioned in the error, as
well as a few others.

In flink-conf.yaml:
taskmanager.memory.managed.consumer-weights: DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction: 0.4

In python table api:
t_env.get_config().get_configuration().set_string("taskmanager.memory.process.size",
'500m')
t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.size",
'500m')
t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.fraction",
'.4')
t_env.get_config().get_configuration().set_string("taskmanager.memory.managed.consumer-weights",
'DATAPROC:70,PYTHON:30')

Flink version 1.12.1

Any help is appreciated. Thanks

Reply via email to