[ https://issues.apache.org/jira/browse/FLINK-19777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu updated FLINK-19777: ---------------------------- Component/s: Table SQL / Runtime > Fix NullPointException for WindowOperator.close() > ------------------------------------------------- > > Key: FLINK-19777 > URL: https://issues.apache.org/jira/browse/FLINK-19777 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Environment: jdk 1.8.0_262 > flink 1.11.1 > Reporter: frank wang > Priority: Major > > i use flink sql run a job,the sql and metadata is : > meta : > 1>soure: kafka > create table metric_source_window_table( > `metricName` String, > `namespace` String, > `timestamp` BIGINT, > `doubleValue` DOUBLE, > `longValue` BIGINT, > `metricsValue` String, > `tags` MAP<String, String>, > `meta` Map<String, String>, > t as TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`/1000,'yyyy-MM-dd HH:mm:ss')), > WATERMARK FOR t AS t) WITH ( > 'connector' = 'kafka', > 'topic' = 'ai-platform', > 'properties.bootstrap.servers' = 'xxx', > 'properties.group.id' = 'metricgroup', > 'scan.startup.mode'='earliest-offset', > 'format' = 'json', > 'json.fail-on-missing-field' = 'false', > 'json.ignore-parse-errors' = 'true') > 2>sink to clickhouse(the clickhouse-connector was developed by ourself) > create table flink_metric_window_table( > `timestamp` BIGINT, > `longValue` BIGINT, > `metricName` String, > `metricsValueSum` DOUBLE, > `metricsValueMin` DOUBLE, > `metricsValueMax` DOUBLE, > `tag_record_id` String, > `tag_host_ip` String, > `tag_instance` String, > `tag_job_name` String, > `tag_ai_app_name` String, > `tag_namespace` String, > `tag_ai_type` String, > `tag_host_name` String, > `tag_alarm_domain` String) WITH ( > 'connector.type' = 'clickhouse', > 'connector.property-version' = '1', > 'connector.url' = 'jdbc:clickhouse://xxx:8123/dataeye', > 'connector.cluster'='ck_cluster', > 'connector.write.flush.max-rows'='6000', > 'connector.write.flush.interval'='1000', > 'connector.table' = 'flink_metric_table_all') > my sql is : > insert into > hive.temp_vipflink.flink_metric_window_table > select > cast(HOP_ROWTIME(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE) AS BIGINT) > AS `timestamps`, > sum(COALESCE( `longValue`, 0)) AS longValue, > metricName, > sum(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS > metricsValueSum, > min(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS > metricsValueMin, > max(IF(IS_DIGIT(metricsValue), cast(metricsValue AS DOUBLE), 0)) AS > metricsValueMax, > tags ['record_id'], > tags ['host_ip'], > tags ['instance'], > tags ['job_name'], > tags ['ai_app_name'], > tags ['namespace'], > tags ['ai_type'], > tags ['host_name'], > tags ['alarm_domain'] > from > hive.temp_vipflink.metric_source_window_table > group by > metricName, > tags ['record_id'], > tags ['host_ip'], > tags ['instance'], > tags ['job_name'], > tags ['ai_app_name'], > tags ['namespace'], > tags ['ai_type'], > tags ['host_name'], > tags ['alarm_domain'], > HOP(t, INTERVAL '60' SECOND, INTERVAL '15' MINUTE) > > when i run this sql for a long hours, it will appear a exception like this: > [2020-10-22 20:54:52.089] [ERROR] [GroupWindowAggregate(groupBy=[metricName, > $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9], window=[SlidingGroupWindow('w$, > t, 900000, 60000)], properties=[w$start, w$end, w$rowtime, w$proctime], > select=[metricName, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, SUM($f11) AS > longValue, SUM($f12) AS metricsValueSum, MIN($f12) AS metricsValueMin, > MAX($f12) AS metricsValueMax, start('w$) AS w$start, end('w$) AS w$end, > rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime]) -> > Calc(select=[CAST(CAST(w$rowtime)) AS timestamps, longValue, metricName, > metricsValueSum, metricsValueMin, metricsValueMax, $f1 AS EXPR$6, $f2 AS > EXPR$7, $f3 AS EXPR$8, $f4 AS EXPR$9, $f5 AS EXPR$10, $f6 AS EXPR$11, $f7 AS > EXPR$12, $f8 AS EXPR$13, $f9 AS EXPR$14]) -> SinkConversionToTuple2 -> Sink: > JdbcUpsertTableSink(timestamp, longValue, metricName, metricsValueSum, > metricsValueMin, metricsValueMax, tag_record_id, tag_host_ip, tag_instance, > tag_job_name, tag_ai_app_name, tag_namespace, tag_ai_type, tag_host_name, > tag_alarm_domain) (23/44)] > [org.apache.flink.streaming.runtime.tasks.StreamTask] >>> Error during > disposal of stream operator. java.lang.NullPointerException: null at > org.apache.flink.table.runtime.operators.window.WindowOperator.dispose(WindowOperator.java:318) > ~[flink-table-blink_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:729) > [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:645) > [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:549) > [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > [flink-dist_2.11-1.11-SNAPSHOT.jar:1.11-SNAPSHOT] at > java.lang.Thread.run(Thread.java:748) [?:1.8.0_262] > > finally ,this job is error, and this job will be failed -- This message was sent by Atlassian Jira (v8.3.4#803005)