Below is a basic unit test of what we are trying to achieve, but basically, we are trying to convert from a retracting stream to a RetractingStreamTableSink, which is easily done with the CRow from the original flink planner, but seems to be very difficult to do with the blink planner.
The below fails with: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match. Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>] Sink schema: [key: STRING, id: STRING] but will succeed if you uncomment the CRow lines of code and run with the original table planner. Any thoughts on how we can accomplish this? @Test public void retractStream() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings); Row row1 = new Row(2); row1.setField(0, "1"); row1.setField(1, "1"); SingleOutputStreamOperator<Row> source = executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1); tableEnvironment.createTemporaryView("table1", source, "key, id"); Table outputTable = tableEnvironment.sqlQuery("select id, key from table1"); RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo); // This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples // as POJOs // SingleOutputStreamOperator<CRow> tuple2DataStream = tableEnvironment // .toRetractStream(outputTable, rowTypeInfo) // .map(value -> new CRow(value.f1, value.f0)) // .returns(new CRowTypeInfo(rowTypeInfo)); tableEnvironment.createTemporaryView("outputTable", tuple2DataStream); // Create a retracting table sink TableSchema.Builder schemaBuilder = TableSchema.builder(); schemaBuilder.field("key", DataTypes.STRING()); schemaBuilder.field("id", DataTypes.STRING()); TableSchema schema = schemaBuilder.build(); RetractSink retractTableSink = new RetractSink(new CollectingTableSink(schema)); tableEnvironment.registerTableSink("sink2", retractTableSink); // Wire up the output to the sink tableEnvironment.from("outputTable").insertInto("sink2"); executionEnvironment.execute(); } private static class RetractSink implements RetractStreamTableSink<Row> { private final AppendStreamTableSink<Row> delegate; RetractSink(AppendStreamTableSink<Row> delegate) { this.delegate = delegate; } @Override public TypeInformation<Row> getRecordType() { return delegate.getOutputType(); } @Override public TableSchema getTableSchema() { return delegate.getTableSchema(); } @Override public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() { return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType()); } @Override public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { consumeDataStream(dataStream); } @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) { DataStream<Row> filteredAndMapped = dataStream.flatMap(new TupleMapper()).returns(getRecordType()); return delegate.consumeDataStream(filteredAndMapped); } @Override public TableSink<Tuple2<Boolean, Row>> configure(String[] fieldNames, TypeInformation<?>[] fieldTypes) { throw new UnsupportedOperationException(); } } private static final class TupleMapper implements FlatMapFunction<Tuple2<Boolean, Row>, Row> { @Override public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> out) { if (value.f0) { out.collect(value.f1); } } } On Thu, Jun 18, 2020 at 10:21 AM John Mathews <jmathews3...@gmail.com> wrote: > So the difference between Tuple2<Boolean, Row> and CRow is that CRow has a > special TypeInformation defined here: > https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32 > > > that returns the TypeInfo of the underlying row, whereas the > TypeInformation for Tuple2 will return type information that contains the > boolean for the retraction + a nested type info for the row. So all > downstream operations that rely on seeing just the row type info now break > for us. > > On Wed, Jun 17, 2020 at 9:23 PM Jark Wu <imj...@gmail.com> wrote: > >> Hi John, >> >> Maybe I misunderstand something, but CRow doesn't have the `getSchema()` >> method. You can getSchema() on the Table, this also works if you convert >> the table into Tuple2<Boolean, Row>. >> Actually, there is no big difference between CRow and Tuple2<Boolean, >> Row>, they both wrap the change flag and the Row. >> >> Best, >> Jark >> >> >> >> On Thu, 18 Jun 2020 at 06:39, John Mathews <jmathews3...@gmail.com> >> wrote: >> >>> Hello Godfrey, >>> >>> Thanks for the response! >>> >>> I think the problem with Tuple2, is that if my understanding is correct >>> of how CRow worked, when CRow's getSchema() was returned it would return >>> the underlying schema of the row it contained. Tuple2 doesn't do that, so >>> it changes/breaks a lot of our downstream code that is relying on the >>> TableSchema to return the underlying row's schema, and not a Tuple schema. >>> >>> Any thoughts on that issue? >>> >>> >>> On Wed, Jun 17, 2020 at 12:16 AM godfrey he <godfre...@gmail.com> wrote: >>> >>>> hi John, >>>> >>>> You can use Tuple2[Boolean, Row] to replace CRow, the >>>> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean, >>>> T)]. >>>> >>>> the code looks like: >>>> >>>> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R] >>>> { >>>> override def map(value: (Boolean, Row)): R = ... >>>> }) >>>> >>>> Bests, >>>> Godfrey >>>> >>>> John Mathews <jmathews3...@gmail.com> 于2020年6月17日周三 下午12:13写道: >>>> >>>>> Hello, >>>>> >>>>> I am working on migrating from the flink table-planner to the new >>>>> blink one, and one problem I am running into is that it doesn't seem like >>>>> Blink has a concept of a CRow, unlike the original table-planner. >>>>> >>>>> I am therefore struggling to figure out how to properly convert a >>>>> retracting stream to a SingleOutputStreamOperator when using just the >>>>> Blink >>>>> planner libraries. >>>>> >>>>> E.g. in the old planner I could do something like this: >>>>> SingleOutputStreamOperator<CRow> stream = >>>>> tableEnvironment.toRetractStream(table, typeInfo) >>>>> .map(value -> new CRow(value.f1, value.f0); >>>>> >>>>> but without the CRow, I'm not sure how to accomplish this. >>>>> >>>>> Any suggestions? >>>>> >>>>> Thanks! >>>>> John >>>>> >>>>> >>>>>