Thanks.

That works like a charm.

Niels

On Thu, Oct 14, 2021 at 5:17 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> To implement the renaming of fields with the new API, try this:
>
> tableEnv.createTemporaryView(
>         "AgentStream",
>         inputStream,
>         Schema.newBuilder()
>                 .columnByExpression("useragent", "f0")
>                 .columnByExpression("expectedDeviceClass", "f1")
>                 .columnByExpression("expectedAgentNameVersionMajor", "f2")
>                 .build());
>
> You can see the inputStream as some sort of source table whose column
> names are f0, f1, and f2 (because inputStream is a stream of Tuple3). To
> create a temporary view from that "source table", what you'll write in SQL
> should be
>
> CREATE TEMPORARY VIEW AgentStream SELECT f0 AS useragent, f1 AS
> expectedDeviceClass, f2 AS expectedAgentNameVersionMajor FROM inputStream
>
> Converting this SQL to java code will give you the result above.
>
> For more information, see documentation of
> StreamTableEnvironment#fromDataStream [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema-
>
> Niels Basjes <ni...@basjes.nl> 于2021年10月13日周三 下午11:05写道:
>
>> To test a Flink Table UDF I wrote a while ago I created this code to test
>> it:
>>
>> (Full link:
>>
>> https://github.com/nielsbasjes/yauaa/blob/v6.0/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/TestTableFunction.java#L80
>> )
>>
>> // The base execution environment
>> StreamExecutionEnvironment   senv        = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> // The table environment
>> StreamTableEnvironment       tableEnv    = 
>> StreamTableEnvironment.create(senv);
>>
>> // The demo input stream
>> DataStream<Tuple3<String, String, String>> inputStream = 
>> getTestAgentStream(senv);
>>
>> tableEnv.createTemporaryView(
>>
>>     "AgentStream",
>>     inputStream,
>>     $("useragent"),
>>     $("expectedDeviceClass"),
>>     $("expectedAgentNameVersionMajor"));
>>
>>
>> This method (with the array of Expression s) was deprecated in 1.14 so
>> I'm looking to refactor it to the new way of doing the same.
>> Essentially create a streaming table with a few columns on which I then
>> apply my function using SQL and then verify the result.
>>
>> Looking through the Flink codebase I expected this would do the trick
>>
>> tableEnv.createTemporaryView(
>>     "AgentStream",
>>     inputStream,
>>     Schema
>>         .newBuilder()
>>         .column("useragent", DataTypes.STRING())
>>         .column("expectedDeviceClass", DataTypes.STRING())
>>         .column("expectedAgentNameVersionMajor", DataTypes.STRING())
>>         .build()
>> );
>>
>> I was wrong as this call to createTemporaryView fails with
>>
>> org.apache.flink.table.api.ValidationException:
>> Unable to find a field named 'useragent' in the physical data type derived 
>> from the given type information for schema declaration.
>> Make sure that the type information is not a generic raw type.
>> Currently available fields are: [f0, f1, f2]
>>
>> I suspect I'm doing something wrong regarding the mentioned "generic raw 
>> type" and the way I'm trying to define the Schema.
>>
>> What I essentially am looking for is the correct way to give the 3 provided 
>> columns a new name and type.
>>
>> How do I do this correctly in the new API?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>

-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to