软件版本 flink-1.13.6 python3.6.8 本地win10也安装了python3.6.8且添加了python的环境变量和成功运行了$ python -m pip install apache-flink==1.13.6 standalonesession方式部署的,一个JM 两个TM,3台集群都安装了python3.6.8 且安装了pyflink-1.13.6
问题: 1、调用python udf时会报如下错误 Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.apache.flink.table.api.ValidationException: SQL validation failed. Cannot instantiate user-defined function 'myhive.tetris.myreplace'.] with root cause java.lang.RuntimeException: Python callback server start failed! 2、sql中CREATE FUNCTION 中的 AS 后面的类路径在python环境下的填写有什么规则吗? python udf文件myReplace.py的内容 from pyflink.table.expressions import call class MyReplace(ScalarFunction): def __init__(self): self.factor = 12 def eval(self, s): return s.replace('buy','sale') 获取远程集群环境,其中的catalogName=myhive,defaultDatabase=tetris StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(host,port); env.setStateBackend(new HashMapStateBackend()); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointStorage(new JobManagerCheckpointStorage()); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); env.getCheckpointConfig().setExternalizedCheckpointCleanup( CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION ); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,bsSettings); tableEnv.getConfig().getConfiguration().setBoolean("table.exec.hive.fallback-mapred-reader",true); tableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled",true); if (!catalogName.equals(tableEnv.getCurrentCatalog())) { HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir); tableEnv.registerCatalog(catalogName, hiveCatalog); } tableEnv.useCatalog(catalogName); tableEnv.useDatabase(defaultDatabase); List<String> jars = new ArrayList<>(); List<String> pys = new ArrayList<>(); log.info("开始加载hdfs上的udf"); String prefix = "/file/function/flink/"; try { EnvUtil.registerFactory(new FsUrlStreamHandlerFactory()); for (String hdf : EnvUtil.listHdfs(prefix,configuration)) { if (hdf.endsWith(".jar")) { jars.add(hdf); EnvUtil.loadJar(URLUtil.url(hdf)); } else if (hdf.endsWith(".py")){ pys.add(hdf); } } tableEnv.getConfig().getConfiguration().set(PipelineOptions.CLASSPATHS,jars); tableEnv.getConfig().getConfiguration().setString("python.files",StringUtils.join(pys,",")); log.info("完成加载hdfs上的udf"); }catch (Exception e){ e.printStackTrace(); } python文件存放在hdfs指定的路劲下面 上传py文件后通过tableEnv.executeSql 执行了 CREATE FUNCTION IF NOT EXISTS myhive.tetris.myReplaceAS 'myReplace.MyReplace' LANGUAGE PYTHON 先行感谢flink官方同学的辛苦付出 799590...@qq.com