[ 
https://issues.apache.org/jira/browse/FLINK-23420?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jingsong Lee updated FLINK-23420:
---------------------------------
    Priority: Critical  (was: Major)

> sql stream mode lag function java.lang.NullPointerException
> -----------------------------------------------------------
>
>                 Key: FLINK-23420
>                 URL: https://issues.apache.org/jira/browse/FLINK-23420
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.13.1
>            Reporter: xiechenling
>            Priority: Critical
>
> flink 1.13.1  BlinkPlanner  StreamingMode  EXACTLY_ONCE
> log
> {code:java}
> 2021-07-15 21:07:46,328 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 1 (type=CHECKPOINT) @ 1626354466304 for job 
> fd3c2294afe74778cb6ce3bd5d42f0c0.
> 2021-07-15 21:07:46,774 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
> OverAggregate(partitionBy=[targetId], orderBy=[lastDt ASC], window=[ RANG 
> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[displayId, mmsi, 
> latitude, longitude, course, heading, speed, len, minLen, maxLen, wid, id, 
> province, nationality, lastTm, status, vesselName, sClass, targetId, lastDt, 
> $20, LAG(displayId) AS w0$o0, LAG(mmsi) AS w0$o1, LAG($20) AS w0$o2, 
> LAG(latitude) AS w0$o3, LAG(longitude) AS w0$o4, LAG(course) AS w0$o5, 
> LAG(heading) AS w0$o6, LAG(speed) AS w0$o7, LAG(len) AS w0$o8, LAG(minLen) AS 
> w0$o9, LAG(maxLen) AS w0$o10, LAG(wid) AS w0$o11, LAG(id) AS w0$o12, 
> LAG(province) AS w0$o13, LAG(nationality) AS w0$o14, LAG(lastTm) AS w0$o15, 
> LAG(status) AS w0$o16, LAG(vesselName) AS w0$o17, LAG(sClass) AS w0$o18, 
> LAG(targetId) AS w0$o19, LAG(lastDt) AS w0$o20]) -> Calc(select=[displayId, 
> mmsi, $20 AS state, latitude, longitude, course, heading, speed, len, minLen, 
> maxLen, wid, id, province, nationality, lastTm, status, vesselName, sClass, 
> targetId, lastDt, w0$o0 AS previous_displayId, w0$o1 AS previous_mmsi, w0$o2 
> AS previous_state, w0$o3 AS previous_latitude, w0$o4 AS previous_longitude, 
> w0$o5 AS previous_course, w0$o6 AS previous_heading, w0$o7 AS previous_speed, 
> w0$o8 AS previous_len, w0$o9 AS previous_minLen, w0$o10 AS previous_maxLen, 
> w0$o11 AS previous_wid, w0$o12 AS previous_id, w0$o13 AS previous_province, 
> w0$o14 AS previous_nationality, w0$o15 AS previous_lastTm, w0$o16 AS 
> previous_status, w0$o17 AS previous_vesselName, w0$o18 AS previous_sClass, 
> w0$o19 AS previous_targetId, CAST(w0$o20) AS previous_lastDt], where=[(w0$o1 
> <> mmsi)]) -> TableToDataSteam(type=ROW<`displayId` INT, `mmsi` INT, `state` 
> TINYINT, `latitude` DOUBLE, `longitude` DOUBLE, `course` FLOAT, `heading` 
> FLOAT, `speed` FLOAT, `len` INT, `minLen` INT, `maxLen` INT, `wid` INT, `id` 
> STRING, `province` STRING, `nationality` STRING, `lastTm` BIGINT, `status` 
> STRING, `vesselName` STRING, `sClass` STRING, `targetId` STRING, `lastDt` 
> TIMESTAMP(3), `previous_displayId` INT, `previous_mmsi` INT, `previous_state` 
> TINYINT, `previous_latitude` DOUBLE, `previous_longitude` DOUBLE, 
> `previous_course` FLOAT, `previous_heading` FLOAT, `previous_speed` FLOAT, 
> `previous_len` INT, `previous_minLen` INT, `previous_maxLen` INT, 
> `previous_wid` INT, `previous_id` STRING, `previous_province` STRING, 
> `previous_nationality` STRING, `previous_lastTm` BIGINT, `previous_status` 
> STRING, `previous_vesselName` STRING, `previous_sClass` STRING, 
> `previous_targetId` STRING, `previous_lastDt` TIMESTAMP(3)> NOT NULL, 
> rowtime=false) (3/3) (34f17a50932ba7852cff00dabecae88e) switched from RUNNING 
> to FAILED on container_1625646226467_0291_01_000005 @ hadoop-15 
> (dataPort=38082).
> java.lang.NullPointerException: null
>       at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:67)
>  ~[hlx_bigdata_flink.jar:?]
>       at 
> org.apache.flink.api.common.typeutils.base.IntSerializer.serialize(IntSerializer.java:30)
>  ~[hlx_bigdata_flink.jar:?]
>       at 
> org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:114)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.LinkedListSerializer.serialize(LinkedListSerializer.java:39)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.util.InstantiationUtil.serializeToByteArray(InstantiationUtil.java:558)
>  ~[hlx_bigdata_flink.jar:?]
>       at 
> org.apache.flink.table.data.binary.BinaryRawValueData.materialize(BinaryRawValueData.java:113)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.data.binary.LazyBinaryFormat.ensureMaterialized(LazyBinaryFormat.java:126)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:60)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RawValueDataSerializer.copy(RawValueDataSerializer.java:36)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:170)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:131)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:48)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.runtime.state.heap.CopyOnWriteStateMap.get(CopyOnWriteStateMap.java:287)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:267) 
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:141) 
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.table.runtime.operators.over.AbstractRowTimeUnboundedPrecedingOver.onTimer(AbstractRowTimeUnboundedPrecedingOver.java:213)
>  ~[flink-table-blink_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:91)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:70)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:302)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:180)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:626)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:211)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:196)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:105)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:136)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.11-1.13.1.jar:1.13.1]
>       at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]
> 2021-07-15 21:07:46,804 INFO  
> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy
>  [] - Calculating tasks to restart to recover the failed task 
> 244ba2b64f33b53ee2f9b4b8d233e03c_2.
> {code}
> table sql
> {code:java}
> |CREATE TABLE unionTargetJs (
> |  `displayId` INT,
> |  `mmsi` INT,
> |  `state` TINYINT,
> |  `latitude` DOUBLE,
> |  `longitude` DOUBLE,
> |  `course` FLOAT,
> |  `heading` FLOAT,
> |  `speed` FLOAT,
> |  `len` INT,
> |  `minLen` INT,
> |  `maxLen` INT,
> |  `wid` INT,
> |  `id` STRING,
> |  `province` STRING,
> |  `nationality` STRING,
> |  `lastTm` BIGINT,
> |  `status` STRING,
> |  `vesselName` STRING,
> |  `sClass` STRING,
> |  `targetId` AS TargetIdFunction(`displayId`, `province`, `id`),
> |  `lastDt` AS TO_TIMESTAMP(FROM_UNIXTIME(lastTm / 1000, 'yyyy-MM-dd 
> HH:mm:ss')),
> |  WATERMARK FOR `lastDt` AS `lastDt` - INTERVAL '1' SECOND
> |) WITH (
> |  'connector' = 'kafka',
> |  'topic' = 'unionTargetJs',
> |  'properties.bootstrap.servers' = '$kafkaAddress',
> |  'properties.group.id' = 'aisChangeOrClose',
> |  'scan.startup.mode' = 'group-offsets',
> |  'format' = 'json',
> |  'properties.auto.offset.reset' = 'latest'
> |)
> {code}
> select sql
> {code:java}
> |SELECT
> |  `displayId`,
> |  `mmsi`,
> |  `state`,
> |  `latitude`,
> |  `longitude`,
> |  `course`,
> |  `heading`,
> |  `speed`,
> |  `len`,
> |  `minLen`,
> |  `maxLen`,
> |  `wid`,
> |  `id`,
> |  `province`,
> |  `nationality`,
> |  `lastTm`,
> |  `status`,
> |  `vesselName`,
> |  `sClass`,
> |  `targetId`,
> |  `lastDt`,
> | lag( displayId ) OVER w AS `previous_displayId`,
> |  lag( mmsi ) OVER w AS `previous_mmsi`,
> |  lag( state ) OVER w AS `previous_state`,
> |  lag( latitude ) OVER w AS `previous_latitude`,
> |  lag( longitude ) OVER w AS `previous_longitude`,
> |  lag( course ) OVER w AS `previous_course`,
> |  lag( heading ) OVER w AS `previous_heading`,
> |  lag( speed ) OVER w AS `previous_speed`,
> |  lag( len ) OVER w AS `previous_len`,
> |  lag( minLen ) OVER w AS `previous_minLen`,
> |  lag( maxLen ) OVER w AS `previous_maxLen`,
> |  lag( wid ) OVER w AS `previous_wid`,
> |  lag( id ) OVER w AS `previous_id`,
> |  lag( province ) OVER w AS `previous_province`,
> |  lag( nationality ) OVER w AS `previous_nationality`,
> |  lag( lastTm ) OVER w AS `previous_lastTm`,
> |  lag( status ) OVER w AS `previous_status`,
> |  lag( vesselName ) OVER w AS `previous_vesselName`,
> |  lag( sClass ) OVER w AS `previous_sClass`,
> |  lag( targetId ) OVER w AS `previous_targetId`,
> | CAST(lag( lastDt ) OVER w AS TIMESTAMP(3)) AS `previous_lastDt`
> |FROM
> |  unionTargetJs
> |WHERE
> |  province <> 'CeShi'
> |  AND state = 1
> | AND status <> 'DELETED'
> |WINDOW w AS (
> |   PARTITION BY targetId
> |   ORDER BY lastDt)
> {code}
>  udf
> {code:java}
> class TargetIdFunction extends ScalarFunction {
>   def eval(displayId: Integer, province: String, id: String): String = {
>     displayId + ProvinceUtil.getProvinceEnumValue(province) + id
>   }
> }
> public class ProvinceUtil {
>     private static final Map<String, String> PROVINCE_MAP = new HashMap<>();
>     static {
>         PROVINCE_MAP.put("CeShi", ProvinceEnum.CS.getValue());
>         PROVINCE_MAP.put("HaiNan", ProvinceEnum.HI.getValue());
>         PROVINCE_MAP.put("ShanDong", ProvinceEnum.SD.getValue());
>         PROVINCE_MAP.put("FuJian", ProvinceEnum.FJ.getValue());
>         PROVINCE_MAP.put("HeBei", ProvinceEnum.HE.getValue());
>         PROVINCE_MAP.put("GuangDong", ProvinceEnum.GD.getValue());
>         PROVINCE_MAP.put("GuangXi", ProvinceEnum.GX.getValue());
>         PROVINCE_MAP.put("ZheJiang", ProvinceEnum.ZJ.getValue());
>     }
>     public static String getProvinceEnumValue(String key) {
>         return PROVINCE_MAP.get(key);
>     }
> }{code}
>  
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to