Hi! To implement the renaming of fields with the new API, try this:
tableEnv.createTemporaryView( "AgentStream", inputStream, Schema.newBuilder() .columnByExpression("useragent", "f0") .columnByExpression("expectedDeviceClass", "f1") .columnByExpression("expectedAgentNameVersionMajor", "f2") .build()); You can see the inputStream as some sort of source table whose column names are f0, f1, and f2 (because inputStream is a stream of Tuple3). To create a temporary view from that "source table", what you'll write in SQL should be CREATE TEMPORARY VIEW AgentStream SELECT f0 AS useragent, f1 AS expectedDeviceClass, f2 AS expectedAgentNameVersionMajor FROM inputStream Converting this SQL to java code will give you the result above. For more information, see documentation of StreamTableEnvironment#fromDataStream [1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.14/api/java/org/apache/flink/table/api/bridge/java/StreamTableEnvironment.html#fromDataStream-org.apache.flink.streaming.api.datastream.DataStream-org.apache.flink.table.api.Schema- Niels Basjes <ni...@basjes.nl> 于2021年10月13日周三 下午11:05写道: > To test a Flink Table UDF I wrote a while ago I created this code to test > it: > > (Full link: > > https://github.com/nielsbasjes/yauaa/blob/v6.0/udfs/flink-table/src/test/java/nl/basjes/parse/useragent/flink/table/TestTableFunction.java#L80 > ) > > // The base execution environment > StreamExecutionEnvironment senv = > StreamExecutionEnvironment.getExecutionEnvironment(); > > // The table environment > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(senv); > > // The demo input stream > DataStream<Tuple3<String, String, String>> inputStream = > getTestAgentStream(senv); > > tableEnv.createTemporaryView( > > "AgentStream", > inputStream, > $("useragent"), > $("expectedDeviceClass"), > $("expectedAgentNameVersionMajor")); > > > This method (with the array of Expression s) was deprecated in 1.14 so I'm > looking to refactor it to the new way of doing the same. > Essentially create a streaming table with a few columns on which I then > apply my function using SQL and then verify the result. > > Looking through the Flink codebase I expected this would do the trick > > tableEnv.createTemporaryView( > "AgentStream", > inputStream, > Schema > .newBuilder() > .column("useragent", DataTypes.STRING()) > .column("expectedDeviceClass", DataTypes.STRING()) > .column("expectedAgentNameVersionMajor", DataTypes.STRING()) > .build() > ); > > I was wrong as this call to createTemporaryView fails with > > org.apache.flink.table.api.ValidationException: > Unable to find a field named 'useragent' in the physical data type derived > from the given type information for schema declaration. > Make sure that the type information is not a generic raw type. > Currently available fields are: [f0, f1, f2] > > I suspect I'm doing something wrong regarding the mentioned "generic raw > type" and the way I'm trying to define the Schema. > > What I essentially am looking for is the correct way to give the 3 provided > columns a new name and type. > > How do I do this correctly in the new API? > > -- > Best regards / Met vriendelijke groeten, > > Niels Basjes >