Hi Timo, Thanks for this information. Since it is confirmed that toDataStream is functioning correctly and that I can avoid this problem by not using fromValues in my implementation, I think I have got enough information for my current work and don't need to rediscuss fromDatastream's behavior.
Best regards, Yunfeng On Tue, Dec 7, 2021 at 12:42 AM Timo Walther <twal...@apache.org> wrote: > Hi Yunfeng, > > it seems this is a deeper issue with the fromValues implementation. > Under the hood, it still uses the deprecated InputFormat stack. And as > far as I can see, there we don't emit a final MAX_WATERMARK. I will > definitely forward this. > > But toDataStream forwards watermarks correctly. > > I hope this helps. Or do you think we should also rediscuss the > fromDataStream watermark behavior? > > Regards, > Timo > > > On 06.12.21 10:26, Yunfeng Zhou wrote: > > Hi Timo, > > > > Thanks for your response. I encountered another problem that might be > > relevant to the watermark as we discussed above. > > > > In the test cases shown below. I would create a table from some data, > > convert it to datastream and do windowAll().reduce() on it. If we need > > to explicitly specify a `rowtime` metadata column in order to make the > > table pass timestamps to the converted datastream, then both the test > > cases should print out empty lists. In fact, one of them could print out > > a list with some data. The only difference between them is that I > > changed the value of some input data. This behavior can be reproduced > > under Flink ML's latest java environment and configurations. > > > > Is this the expected behavior of `toDataStream`, or I have accidentally > > encountered a bug? > > > > Best regards, > > Yunfeng > > > > ```java > > > > public class SimpleTest { > > @Test > > public void testSimple1()throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > > > Table inputTable = tEnv.fromValues( > > DataTypes.ROW( > > DataTypes.FIELD("weight", DataTypes.DOUBLE()), > > DataTypes.FIELD("f0", DataTypes.STRING()), > > DataTypes.FIELD("f1", DataTypes.DOUBLE()), > > DataTypes.FIELD("f2", DataTypes.DOUBLE()), > > DataTypes.FIELD("f3", DataTypes.DOUBLE()), > > DataTypes.FIELD("f4", DataTypes.INT()), > > DataTypes.FIELD("label", DataTypes.STRING()) > > ), > > Row.of(1., "a", 1., 1., 1., 2, "l1"), > > Row.of(1., "a", 1., 1., 1., 2, "l1"), > > Row.of(1., "b", 0., 1., 1., 3, "l1"), > > Row.of(1., "d", 0., 1., 1.5, 2, "l1"), > > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"), > > Row.of(1., "a", 1., 1., 0., 1, "l0"), > > Row.of(2., "d", 1., 1., 0., 1, "l0") > > ); > > > > DataStream<Row> input = tEnv.toDataStream(inputTable); > > > > System.out.println(IteratorUtils.toList(input > > .windowAll(EndOfStreamWindows.get()) > > .reduce((ReduceFunction<Row>) (row, t1) -> row) > > .executeAndCollect() > > )); > > } > > > > > > @Test > > public void testSimple2()throws Exception { > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.createLocalEnvironment(); > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > > > Table inputTable = tEnv.fromValues( > > DataTypes.ROW( > > DataTypes.FIELD("weight", DataTypes.DOUBLE()), > > DataTypes.FIELD("f0", DataTypes.STRING()), > > DataTypes.FIELD("f1", DataTypes.DOUBLE()), > > DataTypes.FIELD("f2", DataTypes.DOUBLE()), > > DataTypes.FIELD("f3", DataTypes.DOUBLE()), > > DataTypes.FIELD("f4", DataTypes.INT()), > > DataTypes.FIELD("label", DataTypes.STRING()) > > ), > > Row.of(1., "a", 1., 1., 1., 2, "l1"), > > Row.of(1., "a", 1., 0., 1., 2, "l1"), > > Row.of(1., "b", 0., 1., 1., 3, "l1"), > > Row.of(1., "d", 0., 1., 1.5, 2, "l1"), > > Row.of(2., "c", 1.5, 1., 0.5, 3, "l0"), > > Row.of(1., "a", 1., 1.5, 0., 1, "l0"), > > Row.of(2., "d", 1., 1., 0., 1, "l0") > > ); > > > > DataStream<Row> input = tEnv.toDataStream(inputTable); > > > > System.out.println(IteratorUtils.toList(input > > .windowAll(EndOfStreamWindows.get()) > > .reduce((ReduceFunction<Row>) (row, t1) -> row) > > .executeAndCollect() > > )); > > } > > } > > > > ``` > > > > ```java > > > > /** > > * A {@link WindowAssigner} that assigns all elements of a bounded input > > stream into one window > > * pane. The results are emitted once the input stream has ended. > > */ > > public class EndOfStreamWindowsextends WindowAssigner<Object, > TimeWindow> { > > > > private static final EndOfStreamWindowsINSTANCE =new > EndOfStreamWindows(); > > > > private EndOfStreamWindows() {} > > > > public static EndOfStreamWindowsget() { > > return INSTANCE; > > } > > > > @Override > > public Collection<TimeWindow>assignWindows( > > Object element, long timestamp, WindowAssignerContext > context) { > > return Collections.singletonList(new TimeWindow(Long.MIN_VALUE, > Long.MAX_VALUE)); > > } > > > > @Override > > public Trigger<Object, > TimeWindow>getDefaultTrigger(StreamExecutionEnvironment env) { > > return EventTimeTrigger.create(); > > } > > > > @Override > > public StringtoString() { > > return "EndOfStreamWindows()"; > > } > > > > @Override > > public TypeSerializer<TimeWindow>getWindowSerializer(ExecutionConfig > executionConfig) { > > return new TimeWindow.Serializer(); > > } > > > > @Override > > public boolean isEventTime() { > > return true; > > } > > } > > > > ``` > > > > On Fri, Nov 5, 2021 at 4:29 PM Timo Walther <twal...@apache.org > > <mailto:twal...@apache.org>> wrote: > > > > Hi Yunfeng, > > > > by default the fromDataStream does not propagate watermarks into > Table > > API. Because Table API needs a time attribute in the schema that > > corresponds to the watermarking. A time attribute will also put back > > into the stream record during toDataStream. > > > > Please take a look at: > > > > > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream > > < > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromdatastream > > > > > > Esp. example 4 should solve your use case: > > > > // === EXAMPLE 4 === > > > > // derive all physical columns automatically > > // but access the stream record's timestamp for creating a rowtime > > attribute column > > // also rely on the watermarks generated in the DataStream API > > > > // we assume that a watermark strategy has been defined for > > `dataStream` > > before > > // (not part of this example) > > Table table = > > tableEnv.fromDataStream( > > dataStream, > > Schema.newBuilder() > > .columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)") > > .watermark("rowtime", "SOURCE_WATERMARK()") > > .build()); > > > > I hope this helps. > > > > Regards, > > Timo > > > > > > On 04.11.21 12:00, Yunfeng Zhou wrote: > > > Hi, > > > > > > I found that if I convert a Datastream into Table and back into > > > Datastream, watermark of the stream will be lost. As shown in the > > > program below, the TestOperator before the conversion will have > its > > > processWatermark() method triggered and watermark value printed, > > but the > > > one after the conversion will not. > > > > > > Is my observation correct? If so, is it the expected behavior of > the > > > conversion API? My current work needs me to convert a table into > > > datastream and to do window operation on it, but this problem > > blocks me > > > from creating a window. > > > > > > Regards, > > > Yunfeng > > > > > > ```java > > > public class SimpleTest { > > > public static void main(String[] args) throws Exception { > > > StreamExecutionEnvironment env = > > > StreamExecutionEnvironment.createLocalEnvironment(); > > > env.setParallelism(1); > > > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > > > > > > DataStream<Row> input = env.fromElements(Row.of(1)); > > > > > > input = input.transform( > > > "TestOperator", > > > new RowTypeInfo(new > > > TypeInformation[]{TypeInformation.of(Integer.class)}, new > > String[]{"f0"}), > > > new TestOperator("0") > > > ); > > > > > > input = tEnv.toDataStream(tEnv.fromDataStream(input)); > > > > > > input = input.transform( > > > "TestOperator", > > > new RowTypeInfo(new > > > TypeInformation[]{TypeInformation.of(Integer.class)}, new > > String[]{"f0"}), > > > new TestOperator("1") > > > ); > > > > > > > System.out.println(IteratorUtils.toList(input.executeAndCollect())); > > > } > > > > > > private static class TestOperator extends > AbstractStreamOperator<Row> > > > implements OneInputStreamOperator<Row, Row>{ > > > private final String prefix; > > > > > > private TestOperator(String prefix) { > > > this.prefix = prefix; > > > } > > > > > > @Override > > > public void processElement(StreamRecord<Row> streamRecord) throws > > > Exception { > > > System.out.println(prefix + streamRecord.getValue()); > > > output.collect(streamRecord); > > > } > > > > > > @Override > > > public void processWatermark(Watermark mark) throws Exception { > > > super.processWatermark(mark); > > > System.out.println(prefix + mark.toString()); > > > } > > > } > > > } > > > ``` > > > >