Hi, I found that if I convert a Datastream into Table and back into Datastream, watermark of the stream will be lost. As shown in the program below, the TestOperator before the conversion will have its processWatermark() method triggered and watermark value printed, but the one after the conversion will not.
Is my observation correct? If so, is it the expected behavior of the conversion API? My current work needs me to convert a table into datastream and to do window operation on it, but this problem blocks me from creating a window. Regards, Yunfeng ```java public class SimpleTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream<Row> input = env.fromElements(Row.of(1)); input = input.transform( "TestOperator", new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}), new TestOperator("0") ); input = tEnv.toDataStream(tEnv.fromDataStream(input)); input = input.transform( "TestOperator", new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)}, new String[]{"f0"}), new TestOperator("1") ); System.out.println(IteratorUtils.toList(input.executeAndCollect())); } private static class TestOperator extends AbstractStreamOperator<Row> implements OneInputStreamOperator<Row, Row>{ private final String prefix; private TestOperator(String prefix) { this.prefix = prefix; } @Override public void processElement(StreamRecord<Row> streamRecord) throws Exception { System.out.println(prefix + streamRecord.getValue()); output.collect(streamRecord); } @Override public void processWatermark(Watermark mark) throws Exception { super.processWatermark(mark); System.out.println(prefix + mark.toString()); } } } ```