Hi Pranav,
> I call inputStream.assignTimestampsAndWatermarks(watermarkStrategy).
I'm a little confused. It seems that you need proctime as the time
attribute, then why need
assignTimestampsAndWatermarks(watermarkStrategy)?
assignTimestampsAndWatermarks
is only needed when you need event time attribute
> Then, I call tableEnv.registerDatastream(). In this process, I don't see
where I could set the proctime as the time attribute.
Please try the way in the following demo which could run successfully. The
key is `Table table = tableEnv.fromDataStream(ds, expr);
tableEnv.createTemporaryView("name", table);`

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/** The example executes a single Flink job. The results are written
to stdout. */
public final class StreamSQLExample {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);

        final DataStream<Row> order =
                env.fromCollection(
                        Arrays.asList(
                                Row.of(2L, "pen", 3),
                                Row.of(2L, "rubber", 3),
                                Row.of(4L, "beer", 1)));

        Table table =
                tableEnv.fromDataStream(
                        order, $("user"), $("product"), $("amount"),
$("ts").proctime());
        tableEnv.createTemporaryView("orders", table);

        String query =
                "SELECT\n"
                        + "  CAST(TUMBLE_START(ts, INTERVAL '1'
SECOND) AS STRING) window_start,\n"
                        + "  COUNT(*) order_num,\n"
                        + "  SUM(amount) total_amount,\n"
                        + "  COUNT(DISTINCT product) unique_products\n"
                        + "FROM orders\n"
                        + "GROUP BY TUMBLE(ts, INTERVAL '1' SECOND)";
        Table result = tableEnv.sqlQuery(query);
        tableEnv.toAppendStream(result, Row.class).print();

        env.execute("Streaming Window SQL Job");
    }
}


Best,
JING ZHANG

Pranav Patil <pranav.pa...@salesforce.com> 于2021年8月4日周三 上午6:29写道:

> Thank you for the tips. Unfortunately, the guides don't follow the same
> case since I'm not using SQL DDL and I don't convert from DataStream to
> Table, I register the DataStream as a table in the SQL API.
>
> I have a custom DataStream source, which I have as DataStream<Row>. The
> time characteristic is normalized time. I call inputStream
> .assignTimestampsAndWatermarks(watermarkStrategy). Then, I call
> tableEnv.registerDatastream(). In this process, I don't see where I could
> set the proctime as the time attribute.
>
> Also, I'm currently using TimeIndicatorTypeInfo.ROWTIME_INDICATOR as the
> type, so I'm unsure how to cast it to TIMESTAMP(3).
>
>
>
> On Mon, Aug 2, 2021 at 7:18 PM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Pranav,
>> Yes, The root cause is the `timecol` is not a time attribute column.
>> If you use processing time as time attribute, please refer [1] for more
>> information.
>> If you use. event time as time attribute, please refer[2] for more
>> information. And only if choose event time, `assignTimestampsAndWatermarks`
>> is needed.
>> If the problem still exists after you update the program based on the
>> demo in the document, please let me know and provide your program.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/#processing-time
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/time_attributes/#event-time
>>
>> Best,
>> JING ZHANG
>>
>>
>> Pranav Patil <pranav.pa...@salesforce.com> 于2021年8月3日周二 上午8:51写道:
>>
>>> Hi,
>>>
>>> I'm upgrading a repository from Flink 1.11 to Flink 1.13. I have Flink
>>> SQL command that used to do tumbling windows using the following in the
>>> GROUP BY clause:
>>>
>>> SELECT ... FROM ... GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE)
>>>
>>> However now, it gives me the error:
>>>
>>> org.apache.flink.table.api.TableException: Window aggregate can only be 
>>> defined over a time attribute column, but TIMESTAMP(9) encountered.
>>>
>>> I'm not sure why this isn't a time attribute column anymore. Thinking it's 
>>> a syntax change, I try:
>>>
>>> Table(TUMBLE(Table inputStream,DESCRIPTOR(proctime),INTERVAL '1' MINUTE))
>>>
>>> This too doesn't work, with the error:
>>>
>>> org.apache.flink.table.api.ValidationException: The window function 
>>> TUMBLE(TABLE table_name, DESCRIPTOR(timecol), datetime interval) requires 
>>> the timecol is a time attribute type, but is TIMESTAMP(9).
>>>
>>> So I'm realizing the problem is actually that it isn't a time attribute 
>>> column. However, I'm confused because this wasn't a problem in previous 
>>> versions. In the DataStream API file source, assignTimestampsAndWatermarks 
>>> is called on the source stream, which I believed to be enough. I then call 
>>> registerDatastream to access it from Flink SQL. Are there additional steps 
>>> that must be taken in Flink 1.13?
>>>
>>> Thank you.
>>>
>>>

Reply via email to