Eric Xiao created FLINK-29837: --------------------------------- Summary: SQL API does not expose the RowKind of the Row for processing Changelogs Key: FLINK-29837 URL: https://issues.apache.org/jira/browse/FLINK-29837 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.16.0 Reporter: Eric Xiao
When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API: {code:java} val tableEnv = StreamTableEnvironment.create(env) val dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)), Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // interpret the DataStream as a Table val table = tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()) // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table) tableEnv .sqlQuery("SELECT * FROM InputTable where op = '+I'") .execute() .print() {code} The error logs. {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675) at com.shopify.trickle.pipelines.IteratorPipeline$.delayedEndpoint$com$shopify$trickle$pipelines$IteratorPipeline$1(IteratorPipeline.scala:32) at com.shopify.trickle.pipelines.IteratorPipeline$delayedInit$body.apply(IteratorPipeline.scala:11) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1$adapted(App.scala:80) at scala.collection.immutable.List.foreach(List.scala:431) at scala.App.main(App.scala:80) at scala.App.main$(App.scala:78) at com.shopify.trickle.pipelines.IteratorPipeline$.main(IteratorPipeline.scala:11) at com.shopify.trickle.pipelines.IteratorPipeline.main(IteratorPipeline.scala) {code} It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs. [1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] -- This message was sent by Atlassian Jira (v8.20.10#820010)