Hi,

I found that if I convert a Datastream into Table and back into Datastream,
watermark of the stream will be lost. As shown in the program below, the
TestOperator before the conversion will have its processWatermark() method
triggered and watermark value printed, but the one after the conversion
will not.

Is my observation correct? If so, is it the expected behavior of the
conversion API? My current work needs me to convert a table into datastream
and to do window operation on it, but this problem blocks me from creating
a window.

Regards,
Yunfeng

```java
public class SimpleTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStream<Row> input = env.fromElements(Row.of(1));

input = input.transform(
"TestOperator",
new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)},
new String[]{"f0"}),
new TestOperator("0")
);

input = tEnv.toDataStream(tEnv.fromDataStream(input));

input = input.transform(
"TestOperator",
new RowTypeInfo(new TypeInformation[]{TypeInformation.of(Integer.class)},
new String[]{"f0"}),
new TestOperator("1")
);

System.out.println(IteratorUtils.toList(input.executeAndCollect()));
}

private static class TestOperator extends AbstractStreamOperator<Row>
implements OneInputStreamOperator<Row, Row>{
private final String prefix;

private TestOperator(String prefix) {
this.prefix = prefix;
}

@Override
public void processElement(StreamRecord<Row> streamRecord) throws Exception
{
System.out.println(prefix + streamRecord.getValue());
output.collect(streamRecord);
}

@Override
public void processWatermark(Watermark mark) throws Exception {
super.processWatermark(mark);
System.out.println(prefix + mark.toString());
}
}
}
```

Reply via email to