软件版本

flink-1.13.6
python3.6.8
本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install 
apache-flink==1.13.6
standalonesession方式部署的,一个JM  两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6

问题:

1、调用python udf时会报如下错误
Servlet.service() for servlet [dispatcherServlet] in context with path [] threw 
exception [Request processing failed; nested exception is 
org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot 
instantiate user-defined function 'myhive.tetris.myreplace'.] with root cause
java.lang.RuntimeException: Python callback server start failed!

2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗?

python udf文件myReplace.py的内容

from pyflink.table.expressions import call

class MyReplace(ScalarFunction):
  def __init__(self):
    self.factor = 12

  def eval(self, s):
    return s.replace('buy','sale')

获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createRemoteEnvironment(host,port);
env.setStateBackend(new HashMapStateBackend());
env.enableCheckpointing(1000);
env.getCheckpointConfig().setCheckpointStorage(new 
JobManagerCheckpointStorage());
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION
);
EnvironmentSettings bsSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings);
tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true);
tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true);
if (!catalogName.equals(tableEnv.getCurrentCatalog())) {
    HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, 
hiveConfDir);
    tableEnv.registerCatalog(catalogName, hiveCatalog);
}
tableEnv.useCatalog(catalogName);
tableEnv.useDatabase(defaultDatabase);
List<String> jars = new ArrayList<>();
List<String> pys = new ArrayList<>();
log.info("开始加载hdfs上的udf");
String prefix = "/file/function/flink/";
try {
    EnvUtil.registerFactory(new FsUrlStreamHandlerFactory());
    for (String hdf : EnvUtil.listHdfs(prefix,configuration)) {
if (hdf.endsWith(".jar")) {
    jars.add(hdf);
    EnvUtil.loadJar(URLUtil.url(hdf));
} else if (hdf.endsWith(".py")){
    pys.add(hdf);
}
    }
    
tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars);
    
tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,","));
    log.info("完成加载hdfs上的udf");
}catch (Exception e){
    e.printStackTrace();
}

python文件存放在hdfs指定的路劲下面

上传py文件后通过tableEnv.executeSql 执行了

CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS 'myReplace.MyReplace' 
LANGUAGE PYTHON

先行感谢flink官方同学的辛苦付出



799590...@qq.com

回复