Hi, xingbo.
在检查之后,并没有发现什么 beam 的其他版本,flink-python 版本是 2.11-1.11.1。不管是 pip install
apache-beam==2.19.0 还是没有,都是一样的问题。
用 udf 的示例代码也不通过。本地 python demo.py 是可以正常的。
只有是 flink run -m localhost:8081 -py demo.py 是一直在出现这种问题。
pyflink-shell.sh remote localhost 8081 也试过了。一样的结果。

示例代码如下:

import logging
import sys
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def word_count():
    content = "line Licensed to the Apache Software Foundation ASF under one
" \
              "line or more contributor license agreements See the NOTICE
file " \
              "line distributed with this work for additional information "
\
              "line regarding copyright ownership The ASF licenses this file
" \
              "to you under the Apache License Version the " \
              "License you may not use this file except in compliance " \
              "with the License"
    t_env = StreamTableEnvironment.create(
        StreamExecutionEnvironment.get_execution_environment(),
       
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build()
    )
   
t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed",
True)
    sink_ddl = """
        create table Results(word VARCHAR,`count` BIGINT) with ( 'connector'
= 'print')
        """
    add = udf(lambda i: i + 1024, [DataTypes.BIGINT()], DataTypes.BIGINT())
    t_env.register_function("add_test", add)
    t_env.sql_update(sink_ddl)
    elements = [(word, 1) for word in content.split(" ")]
    t_env.from_elements(elements, ["word", "count"]) \
        .group_by("word") \
        .select("word, add_test(count(1)) as count") \
        .insert_into("Results")
    t_env.execute("word_count")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
    word_count()


环境是基于官方的 docker 镜像 flink:1.11.1-scala_2.11,JobManager 和 TaskManager 都正常,在没有
udf 的时候作业都是正常的,jar 包只装了 jdbc,kafka,es 的connector,还有 csv 的 jar 包。

这个情况下需要装什么东西吗,还是需要改配置。

日志上提示是:

2020-11-03 09:24:05,792 ERROR org.apache.flink.client.python.PythonDriver       
          
[] - Run python process failed
java.lang.RuntimeException: Python process exits with code: 1
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:88)
~[flink-python_2.11-1.11.1.jar:1.11.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.1.jar:1.11.1]
2020-11-03 09:24:05,798 ERROR org.apache.flink.client.cli.CliFrontend           
          
[] - Fatal error while running command line interface.
org.apache.flink.client.program.ProgramAbortException: null
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
~[flink-python_2.11-1.11.1.jar:1.11.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_265]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_265]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_265]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_265]
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
[flink-dist_2.11-1.11.1.jar:1.11.1]
org.apache.flink.client.program.ProgramAbortException
        at
org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
        at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复