yanbiao created FLINK-27138: ------------------------------- Summary: flink1.14.0-standalone部署-sql方式提交job-失败报错-提示不支持 Key: FLINK-27138 URL: https://issues.apache.org/jira/browse/FLINK-27138 Project: Flink Issue Type: Bug Components: Client / Job Submission, Table SQL / Client Affects Versions: 1.14.0 Environment: CentOS-7
flink 1.14.0 Release Reporter: yanbiao Attachments: 3个or正常.png, webUI报错信息.png, 部署目录.png 官网下载的1.14.0版本,standalone单机部署,采用rest接口提交到webUI rest接口: /jars/<jarName>/run 问题: 当process的sql中where后面的连续的or条件多于3个(3个是可以的)时,提交报错 报错信息如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Currently Flink doesn't support individual window table-valued function TUMBLE(time_col=[ts], size=[10 min]). Please use window table-valued function with the following computations: 1. aggregate using window_start and window_end as group keys. 2. topN using window_start and window_end as partition key. 3. join with join condition contains window starts equality of input tables and window ends equality of input tables. 提交的sql如下: CREATE TABLE source22 ( `timestamp` VARCHAR, `logLevel` VARCHAR, `threadName` VARCHAR, `componentId` VARCHAR, `stackTrace` VARCHAR, `logType` VARCHAR, `eventType` VARCHAR, `subType` VARCHAR, `operateType` VARCHAR, `operateTag` VARCHAR, `weight` INT, `operator` VARCHAR, `authRoles` VARCHAR, `sourceHost` VARCHAR, `restUri` VARCHAR, `restMethod` VARCHAR, `operateObj` VARCHAR, `operateResult` VARCHAR, `requestParams` VARCHAR, `triggerCondition` VARCHAR, `authType` VARCHAR, `dataSize` INT, `exceptionMsg` VARCHAR, ts as TO_TIMESTAMP(`timestamp`,'yyyy-MM-dd HH:mm:ss.SSS'), WATERMARK FOR ts AS ts - INTERVAL '10'second ) WITH ( 'connector' = 'kafka', 'format' = 'json', 'properties.bootstrap.servers' = '10.192.78.27:9092', 'scan.startup.mode' = 'latest-offset', 'topic' = 'logaudit_yf20220304', 'properties.group.id' = 'groupId_22' ) CREATE TABLE sink22 ( `id` VARCHAR, `rule_key` VARCHAR, `rule_name` VARCHAR, `metric_threshold` INT, `audit_status` INT, `audit_comment_num` INT, `window_start` TIMESTAMP(3), `window_end` TIMESTAMP(3), `metric_count` BIGINT, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://10.192.78.27:39200', 'index' = 'logaudit_rule_22' ) INSERT INTO sink22 SELECT uuid() as id ,'22' as rule_key ,'4个or测试' as rule_name ,2 as metric_threshold ,0 as audit_status ,0 as audit_comment_num ,window_start,window_end ,count(*) as metric_count FROM TABLE(TUMBLE(TABLE source22, DESCRIPTOR(ts), INTERVAL '10' Second)) WHERE logType='operation' and (componentId='a' or componentId='b' or componentId='c' or componentId='d' ) GROUP BY window_start,window_end HAVING count(*) >2 实际的jar包核心代码如下: public class AuditRuleJob { public static void main(String[] args) { final ParameterTool params = ParameterTool.fromArgs(args); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); env.setParallelism(1); env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 300000)); env.enableCheckpointing(60000); env.setRuntimeMode(RuntimeExecutionMode.STREAMING); //CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:cancel后会保留checkpoint数据,恢复时可以恢复到指定的ck //CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION:cancel后会删除checkpoint数据,只有执行失败的时候才会保存ck env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); if (!params.has("source") || !params.has("sink") || !params.has("transform")) { throw new RuntimeException("source or sink or transform sql parameter missing"); } String sourceBase64 = params.get("source"); String source = new String(Base64.getDecoder().decode(sourceBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8); String sinkBase64 = params.get("sink"); String sink = new String(Base64.getDecoder().decode(sinkBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8); String transformBase64 = params.get("transform"); String transform = new String(Base64.getDecoder().decode(transformBase64.getBytes(StandardCharsets.UTF_8)),StandardCharsets.UTF_8); tableEnv.executeSql(source); tableEnv.executeSql(sink); tableEnv.executeSql(transform); } } -- This message was sent by Atlassian Jira (v8.20.1#820001)