Hi All, I am using Flink1.12, I am trying to read realtime data from Kafka topic and as per the requirement I need to implement windowing LAG function. Approach I followed is below: DataStream vData = env.addSource(...)vData.keyBy(Id) createTemperoryViewthen apply flink sql. My sample data is like below, vTime field contains the timestamp when the even was generated and vNumSeq is the unique number for particular group Id. I tried Lag function by ordering by vSeq field (long datatype), Job failed with "OVER windows' ordering in stream mode must be defined on a time attribute". I even tried by using vTime field (eventTS is also long datatype). I tried converting this field to sql.Timestamp, still no luck Job failed with above error. When I referred few documents solution provided was to use proctime/rowtime. So I modified the query to use proctime() Job succeeded but with wrong results. Kindly help with simple example badly stuck. I am ok to use even Datastream API to implement lag functionality. Lag Query:select vdata.f0 as id, vdata.f1 as name, vdata.f2 as vTime, vdata.f3 as vSeq, vdata.f4 as currentSal, LAG(vdata.f4,1,0) OVER ( partition BY vdata.f0 ORDER BY proctime()) AS prevSal from VData vData
Wrong output : Expected: Regards,Sunitha.