Hi Dawid, Yes, looks like it. Thanks! Is there an ETA on 1.12.2 yet?
On Mon, Feb 15, 2021 at 9:48 AM Dawid Wysakowicz <dwysakow...@apache.org> wrote: > Hey Yuval, > > Could it be that you are hitting this bug[1], which has been fixed > recently? > > Best, > > Dawid > > [1] https://issues.apache.org/jira/browse/FLINK-21013 > On 15/02/2021 08:20, Yuval Itzchakov wrote: > > Hi, > > I have a source that generates events with timestamps. These flow nicely, > until encountering a conversion from Table -> DataStream[Row]: > > def toRowRetractStream(implicit ev: TypeInformation[Row]): > DataStream[Row] = > table > .toRetractStream[Row] > .flatMap { (row, collector: Collector[Row]) => > if (row._1) > collector.collect(row._2) > } > > The transformation causes a SinkConversion to be generated with the > following code: > > @Override > public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > > Object[] fields$12 = new Object[2]; > fields$12[0] = > org.apache.flink.table.data.util.RowDataUtil.isAccumulateMsg(in1); > fields$12[1] = (org.apache.flink.types.Row) > converter$9.toExternal((org.apache.flink.table.data.RowData) in1); > scala.Tuple2 result$10 = (scala.Tuple2) > serializer$11.createInstance(fields$12); > output.collect(outElement.replace(result$10)); > } > > The code receives an element of type StreamRecord, which does have a > timestamp attached to it, but fails to forward it to the new element > (outElement) which is initialized as: > > private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = > new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > > Am I missing anything in the Table -> DataStream[Row] conversion that > should make the timestamp follow through? or is this a bug? > > -- > Best Regards, > Yuval Itzchakov. > > -- Best Regards, Yuval Itzchakov.