Thanks. That works like a charm.
Niels On Thu, Oct 14, 2021 at 5:17 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > To implement the renaming of fields with the new API, try this: > > tableEnv.createTemporaryView( > "AgentStream", > inputStream, > Schema.newBuilder() > .columnByExpression("useragent", "f0") > .columnByExpression("expectedDeviceClass", "f1") > .columnByExpression("expectedAgentNameVersionMajor", "f2") > .build()); > > You can see the inputStream as some sort of source table whose column > names are f0, f1, and f2 (because inputStream is a stream of Tuple3). To > create a temporary view from that "source table", what you'll write in SQL > should be > > CREATE TEMPORARY VIEW AgentStream SELECT f0 AS useragent, f1 AS > expectedDeviceClass, f2 AS expectedAgentNameVersionMajor FROM inputStream > > Converting this SQL to java code will give you the result above. > > For more information, see documentation of > StreamTableEnvironment#fromDataStream [1] > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema- > > Niels Basjes <ni...@basjes.nl> 于2021年10月13日周三 下午11:05写道: > >> To test a Flink Table UDF I wrote a while ago I created this code to test >> it: >> >> (Full link: >> >> https://github.com/nielsbasjes/yauaa/blob/v6.0/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/TestTableFunction.java#L80 >> ) >> >> // The base execution environment >> StreamExecutionEnvironment senv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> // The table environment >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(senv); >> >> // The demo input stream >> DataStream<Tuple3<String, String, String>> inputStream = >> getTestAgentStream(senv); >> >> tableEnv.createTemporaryView( >> >> "AgentStream", >> inputStream, >> $("useragent"), >> $("expectedDeviceClass"), >> $("expectedAgentNameVersionMajor")); >> >> >> This method (with the array of Expression s) was deprecated in 1.14 so >> I'm looking to refactor it to the new way of doing the same. >> Essentially create a streaming table with a few columns on which I then >> apply my function using SQL and then verify the result. >> >> Looking through the Flink codebase I expected this would do the trick >> >> tableEnv.createTemporaryView( >> "AgentStream", >> inputStream, >> Schema >> .newBuilder() >> .column("useragent", DataTypes.STRING()) >> .column("expectedDeviceClass", DataTypes.STRING()) >> .column("expectedAgentNameVersionMajor", DataTypes.STRING()) >> .build() >> ); >> >> I was wrong as this call to createTemporaryView fails with >> >> org.apache.flink.table.api.ValidationException: >> Unable to find a field named 'useragent' in the physical data type derived >> from the given type information for schema declaration. >> Make sure that the type information is not a generic raw type. >> Currently available fields are: [f0, f1, f2] >> >> I suspect I'm doing something wrong regarding the mentioned "generic raw >> type" and the way I'm trying to define the Schema. >> >> What I essentially am looking for is the correct way to give the 3 provided >> columns a new name and type. >> >> How do I do this correctly in the new API? >> >> -- >> Best regards / Met vriendelijke groeten, >> >> Niels Basjes >> > -- Best regards / Met vriendelijke groeten, Niels Basjes