Hi, Unfortunately support for consuming upsert stream is not supported yet. It's not as easy as adding the type information there as you suggested. Even if you do that it will still be considered to be an append message internally by the planner. There is an ongoing effort (FLIP-95[1]) to support it in Flink 1.11.
Best, Dawid [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces On 13/05/2020 01:03, Jiahui Jiang wrote: > Hello Flink friends, I have a retract stream in the format of > 'DataStream<CRow>' that I want to register into my table environment, > and also expose processing time column in the table. > > For a regular datastream, I have being > doing 'tableEnvironment.createTemporaryView(path, dataStream, > 'field1,field2, ..,__processing_time_column.proctime')'. with no > issue. But for this retract stream, I was getting an error > "org.apache.flink.table.api.ValidationException: Too many fields > referenced from an atomic type." > > Digging a little bit deeper, in TypeInfoUtils#extractFieldInformation > <https://github.com/hequn8128/flink/blob/master/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java#L250>, > it doesn't handle CRowTypeInfo as a known case. Looking at the > behavior of > > Since it's a standard CompositeType, instead of only handling > 'if (inputType instanceof PojoTypeInfo)', can we just add CRowTypeInfo > here too? Is there any risk that I'm not aware of? > > Thank you!
signature.asc
Description: OpenPGP digital signature