Hi Pranav, > I call inputStream.assignTimestampsAndWatermarks(watermarkStrategy). I'm a little confused. It seems that you need proctime as the time attribute, then why need assignTimestampsAndWatermarks(watermarkStrategy)? assignTimestampsAndWatermarks is only needed when you need event time attribute > Then, I call tableEnv.registerDatastream(). In this process, I don't see where I could set the proctime as the time attribute. Please try the way in the following demo which could run successfully. The key is `Table table = tableEnv.fromDataStream(ds, expr); tableEnv.createTemporaryView("name", table);`
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import java.util.Arrays; import static org.apache.flink.table.api.Expressions.$; /** The example executes a single Flink job. The results are written to stdout. */ public final class StreamSQLExample { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); final DataStream<Row> order = env.fromCollection( Arrays.asList( Row.of(2L, "pen", 3), Row.of(2L, "rubber", 3), Row.of(4L, "beer", 1))); Table table = tableEnv.fromDataStream( order, $("user"), $("product"), $("amount"), $("ts").proctime()); tableEnv.createTemporaryView("orders", table); String query = "SELECT\n" + " CAST(TUMBLE_START(ts, INTERVAL '1' SECOND) AS STRING) window_start,\n" + " COUNT(*) order_num,\n" + " SUM(amount) total_amount,\n" + " COUNT(DISTINCT product) unique_products\n" + "FROM orders\n" + "GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)"; Table result = tableEnv.sqlQuery(query); tableEnv.toAppendStream(result, Row.class).print(); env.execute("Streaming Window SQL Job"); } } Best, JING ZHANG Pranav Patil <pranav.pa...@salesforce.com> 于2021年8月4日周三 上午6:29写道: > Thank you for the tips. Unfortunately, the guides don't follow the same > case since I'm not using SQL DDL and I don't convert from DataStream to > Table, I register the DataStream as a table in the SQL API. > > I have a custom DataStream source, which I have as DataStream<Row>. The > time characteristic is normalized time. I call inputStream > .assignTimestampsAndWatermarks(watermarkStrategy). Then, I call > tableEnv.registerDatastream(). In this process, I don't see where I could > set the proctime as the time attribute. > > Also, I'm currently using TimeIndicatorTypeInfo.ROWTIME_INDICATOR as the > type, so I'm unsure how to cast it to TIMESTAMP(3). > > > > On Mon, Aug 2, 2021 at 7:18 PM JING ZHANG <beyond1...@gmail.com> wrote: > >> Hi Pranav, >> Yes, The root cause is the `timecol` is not a time attribute column. >> If you use processing time as time attribute, please refer [1] for more >> information. >> If you use. event time as time attribute, please refer[2] for more >> information. And only if choose event time, `assignTimestampsAndWatermarks` >> is needed. >> If the problem still exists after you update the program based on the >> demo in the document, please let me know and provide your program. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/#processing-time >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/#event-time >> >> Best, >> JING ZHANG >> >> >> Pranav Patil <pranav.pa...@salesforce.com> 于2021年8月3日周二 上午8:51写道: >> >>> Hi, >>> >>> I'm upgrading a repository from Flink 1.11 to Flink 1.13. I have Flink >>> SQL command that used to do tumbling windows using the following in the >>> GROUP BY clause: >>> >>> SELECT ... FROM ... GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE) >>> >>> However now, it gives me the error: >>> >>> org.apache.flink.table.api.TableException: Window aggregate can only be >>> defined over a time attribute column, but TIMESTAMP(9) encountered. >>> >>> I'm not sure why this isn't a time attribute column anymore. Thinking it's >>> a syntax change, I try: >>> >>> Table(TUMBLE(Table inputStream,DESCRIPTOR(proctime),INTERVAL '1' MINUTE)) >>> >>> This too doesn't work, with the error: >>> >>> org.apache.flink.table.api.ValidationException: The window function >>> TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires >>> the timecol is a time attribute type, but is TIMESTAMP(9). >>> >>> So I'm realizing the problem is actually that it isn't a time attribute >>> column. However, I'm confused because this wasn't a problem in previous >>> versions. In the DataStream API file source, assignTimestampsAndWatermarks >>> is called on the source stream, which I believed to be enough. I then call >>> registerDatastream to access it from Flink SQL. Are there additional steps >>> that must be taken in Flink 1.13? >>> >>> Thank you. >>> >>>