[ https://issues.apache.org/jira/browse/FLINK-23420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jingsong Lee updated FLINK-23420: --------------------------------- Component/s: (was: Table SQL / API) Table SQL / Runtime > sql stream mode lag function java.lang.NullPointerException > ----------------------------------------------------------- > > Key: FLINK-23420 > URL: https://issues.apache.org/jira/browse/FLINK-23420 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.13.1 > Reporter: xiechenling > Priority: Major > > flink 1.13.1 BlinkPlanner StreamingMode EXACTLY_ONCE > log > {code:java} > 2021-07-15 21:07:46,328 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1 (type=CHECKPOINT) @ 1626354466304 for job > fd3c2294afe74778cb6ce3bd5d42f0c0. > 2021-07-15 21:07:46,774 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - > OverAggregate(partitionBy=[targetId], orderBy=[lastDt ASC], window=[ RANG > BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[displayId, mmsi, > latitude, longitude, course, heading, speed, len, minLen, maxLen, wid, id, > province, nationality, lastTm, status, vesselName, sClass, targetId, lastDt, > $20, LAG(displayId) AS w0$o0, LAG(mmsi) AS w0$o1, LAG($20) AS w0$o2, > LAG(latitude) AS w0$o3, LAG(longitude) AS w0$o4, LAG(course) AS w0$o5, > LAG(heading) AS w0$o6, LAG(speed) AS w0$o7, LAG(len) AS w0$o8, LAG(minLen) AS > w0$o9, LAG(maxLen) AS w0$o10, LAG(wid) AS w0$o11, LAG(id) AS w0$o12, > LAG(province) AS w0$o13, LAG(nationality) AS w0$o14, LAG(lastTm) AS w0$o15, > LAG(status) AS w0$o16, LAG(vesselName) AS w0$o17, LAG(sClass) AS w0$o18, > LAG(targetId) AS w0$o19, LAG(lastDt) AS w0$o20]) -> Calc(select=[displayId, > mmsi, $20 AS state, latitude, longitude, course, heading, speed, len, minLen, > maxLen, wid, id, province, nationality, lastTm, status, vesselName, sClass, > targetId, lastDt, w0$o0 AS previous_displayId, w0$o1 AS previous_mmsi, w0$o2 > AS previous_state, w0$o3 AS previous_latitude, w0$o4 AS previous_longitude, > w0$o5 AS previous_course, w0$o6 AS previous_heading, w0$o7 AS previous_speed, > w0$o8 AS previous_len, w0$o9 AS previous_minLen, w0$o10 AS previous_maxLen, > w0$o11 AS previous_wid, w0$o12 AS previous_id, w0$o13 AS previous_province, > w0$o14 AS previous_nationality, w0$o15 AS previous_lastTm, w0$o16 AS > previous_status, w0$o17 AS previous_vesselName, w0$o18 AS previous_sClass, > w0$o19 AS previous_targetId, CAST(w0$o20) AS previous_lastDt], where=[(w0$o1 > <> mmsi)]) -> TableToDataSteam(type=ROW<`displayId` INT, `mmsi` INT, `state` > TINYINT, `latitude` DOUBLE, `longitude` DOUBLE, `course` FLOAT, `heading` > FLOAT, `speed` FLOAT, `len` INT, `minLen` INT, `maxLen` INT, `wid` INT, `id` > STRING, `province` STRING, `nationality` STRING, `lastTm` BIGINT, `status` > STRING, `vesselName` STRING, `sClass` STRING, `targetId` STRING, `lastDt` > TIMESTAMP(3), `previous_displayId` INT, `previous_mmsi` INT, `previous_state` > TINYINT, `previous_latitude` DOUBLE, `previous_longitude` DOUBLE, > `previous_course` FLOAT, `previous_heading` FLOAT, `previous_speed` FLOAT, > `previous_len` INT, `previous_minLen` INT, `previous_maxLen` INT, > `previous_wid` INT, `previous_id` STRING, `previous_province` STRING, > `previous_nationality` STRING, `previous_lastTm` BIGINT, `previous_status` > STRING, `previous_vesselName` STRING, `previous_sClass` STRING, > `previous_targetId` STRING, `previous_lastDt` TIMESTAMP(3)> NOT NULL, > rowtime=false) (3/3) (34f17a50932ba7852cff00dabecae88e) switched from RUNNING > to FAILED on container_1625646226467_0291_01_000005 @ hadoop-15 > (dataPort=38082). > java.lang.NullPointerException: null > at > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:67) > ~[hlx_bigdata_flink.jar:?] > at > org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:30) > ~[hlx_bigdata_flink.jar:?] > at > org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:114) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:39) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:558) > ~[hlx_bigdata_flink.jar:?] > at > org.apache.flink.table.data.binary.BinaryRawValueData.materialize(BinaryRawValueData.java:113) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.data.binary.LazyBinaryFormat.ensureMaterialized(LazyBinaryFormat.java:126) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:60) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:36) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:287) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:267) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:141) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:213) > ~[flink-table-blink_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:211) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > ~[flink-dist_2.11-1.13.1.jar:1.13.1] > at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251] > 2021-07-15 21:07:46,804 INFO > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy > [] - Calculating tasks to restart to recover the failed task > 244ba2b64f33b53ee2f9b4b8d233e03c_2. > {code} > table sql > {code:java} > |CREATE TABLE unionTargetJs ( > | `displayId` INT, > | `mmsi` INT, > | `state` TINYINT, > | `latitude` DOUBLE, > | `longitude` DOUBLE, > | `course` FLOAT, > | `heading` FLOAT, > | `speed` FLOAT, > | `len` INT, > | `minLen` INT, > | `maxLen` INT, > | `wid` INT, > | `id` STRING, > | `province` STRING, > | `nationality` STRING, > | `lastTm` BIGINT, > | `status` STRING, > | `vesselName` STRING, > | `sClass` STRING, > | `targetId` AS TargetIdFunction(`displayId`, `province`, `id`), > | `lastDt` AS TO_TIMESTAMP(FROM_UNIXTIME(lastTm / 1000, 'yyyy-MM-dd > HH:mm:ss')), > | WATERMARK FOR `lastDt` AS `lastDt` - INTERVAL '1' SECOND > |) WITH ( > | 'connector' = 'kafka', > | 'topic' = 'unionTargetJs', > | 'properties.bootstrap.servers' = '$kafkaAddress', > | 'properties.group.id' = 'aisChangeOrClose', > | 'scan.startup.mode' = 'group-offsets', > | 'format' = 'json', > | 'properties.auto.offset.reset' = 'latest' > |) > {code} > select sql > {code:java} > |SELECT > | `displayId`, > | `mmsi`, > | `state`, > | `latitude`, > | `longitude`, > | `course`, > | `heading`, > | `speed`, > | `len`, > | `minLen`, > | `maxLen`, > | `wid`, > | `id`, > | `province`, > | `nationality`, > | `lastTm`, > | `status`, > | `vesselName`, > | `sClass`, > | `targetId`, > | `lastDt`, > | lag( displayId ) OVER w AS `previous_displayId`, > | lag( mmsi ) OVER w AS `previous_mmsi`, > | lag( state ) OVER w AS `previous_state`, > | lag( latitude ) OVER w AS `previous_latitude`, > | lag( longitude ) OVER w AS `previous_longitude`, > | lag( course ) OVER w AS `previous_course`, > | lag( heading ) OVER w AS `previous_heading`, > | lag( speed ) OVER w AS `previous_speed`, > | lag( len ) OVER w AS `previous_len`, > | lag( minLen ) OVER w AS `previous_minLen`, > | lag( maxLen ) OVER w AS `previous_maxLen`, > | lag( wid ) OVER w AS `previous_wid`, > | lag( id ) OVER w AS `previous_id`, > | lag( province ) OVER w AS `previous_province`, > | lag( nationality ) OVER w AS `previous_nationality`, > | lag( lastTm ) OVER w AS `previous_lastTm`, > | lag( status ) OVER w AS `previous_status`, > | lag( vesselName ) OVER w AS `previous_vesselName`, > | lag( sClass ) OVER w AS `previous_sClass`, > | lag( targetId ) OVER w AS `previous_targetId`, > | CAST(lag( lastDt ) OVER w AS TIMESTAMP(3)) AS `previous_lastDt` > |FROM > | unionTargetJs > |WHERE > | province <> 'CeShi' > | AND state = 1 > | AND status <> 'DELETED' > |WINDOW w AS ( > | PARTITION BY targetId > | ORDER BY lastDt) > {code} > udf > {code:java} > class TargetIdFunction extends ScalarFunction { > def eval(displayId: Integer, province: String, id: String): String = { > displayId + ProvinceUtil.getProvinceEnumValue(province) + id > } > } > public class ProvinceUtil { > private static final Map<String, String> PROVINCE_MAP = new HashMap<>(); > static { > PROVINCE_MAP.put("CeShi", ProvinceEnum.CS.getValue()); > PROVINCE_MAP.put("HaiNan", ProvinceEnum.HI.getValue()); > PROVINCE_MAP.put("ShanDong", ProvinceEnum.SD.getValue()); > PROVINCE_MAP.put("FuJian", ProvinceEnum.FJ.getValue()); > PROVINCE_MAP.put("HeBei", ProvinceEnum.HE.getValue()); > PROVINCE_MAP.put("GuangDong", ProvinceEnum.GD.getValue()); > PROVINCE_MAP.put("GuangXi", ProvinceEnum.GX.getValue()); > PROVINCE_MAP.put("ZheJiang", ProvinceEnum.ZJ.getValue()); > } > public static String getProvinceEnumValue(String key) { > return PROVINCE_MAP.get(key); > } > }{code} > > -- This message was sent by Atlassian Jira (v8.3.4#803005)