Below is a basic unit test of what we are trying to achieve, but basically,
we are trying to convert from a retracting stream to a
RetractingStreamTableSink, which is easily done with the CRow from the
original flink planner, but seems to be very difficult to do with the blink
planner.

The below fails with:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink default_catalog.default_database.sink2 do not
match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [key: STRING, id: STRING]

but will succeed if you uncomment the CRow lines of code and run with the
original table planner.

Any thoughts on how we can accomplish this?

@Test
public void retractStream() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance()
            .useBlinkPlanner()
            .inStreamingMode()
            .build();
    StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.createLocalEnvironment();
    StreamTableEnvironment tableEnvironment =
StreamTableEnvironment.create(executionEnvironment, settings);

    Row row1 = new Row(2);
    row1.setField(0, "1");
    row1.setField(1, "1");

    SingleOutputStreamOperator<Row> source =
            
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);

    tableEnvironment.createTemporaryView("table1", source, "key, id");
    Table outputTable = tableEnvironment.sqlQuery("select id, key from table1");

    RowTypeInfo rowTypeInfo = new
RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
    DataStream<Tuple2<Boolean, Row>> tuple2DataStream =
tableEnvironment.toRetractStream(outputTable, rowTypeInfo);

    // This code block below works on Flink planner but fails on Blink
planner because Blink treats all non-tuples
    // as POJOs
    // SingleOutputStreamOperator<CRow> tuple2DataStream = tableEnvironment
    //         .toRetractStream(outputTable, rowTypeInfo)
    //         .map(value -> new CRow(value.f1, value.f0))
    //         .returns(new CRowTypeInfo(rowTypeInfo));

    tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
    // Create a retracting table sink
    TableSchema.Builder schemaBuilder = TableSchema.builder();
    schemaBuilder.field("key", DataTypes.STRING());
    schemaBuilder.field("id", DataTypes.STRING());
    TableSchema schema = schemaBuilder.build();
    RetractSink retractTableSink = new RetractSink(new
CollectingTableSink(schema));
    tableEnvironment.registerTableSink("sink2", retractTableSink);
    // Wire up the output to the sink
    tableEnvironment.from("outputTable").insertInto("sink2");
    executionEnvironment.execute();
}

private static class RetractSink implements RetractStreamTableSink<Row> {

    private final AppendStreamTableSink<Row> delegate;

    RetractSink(AppendStreamTableSink<Row> delegate) {
        this.delegate = delegate;
    }

    @Override
    public TypeInformation<Row> getRecordType() {
        return delegate.getOutputType();
    }

    @Override
    public TableSchema getTableSchema() {
        return delegate.getTableSchema();
    }

    @Override
    public TupleTypeInfo<Tuple2<Boolean, Row>> getOutputType() {
        return new TupleTypeInfo<>(Types.BOOLEAN(), getRecordType());
    }

    @Override
    public void emitDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        consumeDataStream(dataStream);
    }

    @Override
    public DataStreamSink<?>
consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
        DataStream<Row> filteredAndMapped =
                dataStream.flatMap(new TupleMapper()).returns(getRecordType());

        return delegate.consumeDataStream(filteredAndMapped);
    }

    @Override
    public TableSink<Tuple2<Boolean, Row>> configure(String[]
fieldNames, TypeInformation<?>[] fieldTypes) {
        throw new UnsupportedOperationException();
    }
}

private static final class TupleMapper implements
FlatMapFunction<Tuple2<Boolean, Row>, Row> {
    @Override
    public void flatMap(Tuple2<Boolean, Row> value, Collector<Row> out) {
        if (value.f0) {
            out.collect(value.f1);
        }
    }
}


On Thu, Jun 18, 2020 at 10:21 AM John Mathews <jmathews3...@gmail.com>
wrote:

> So the difference between Tuple2<Boolean, Row> and CRow is that CRow has a
> special TypeInformation defined here:
> https://github.com/apache/flink/blob/master/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/runtime/types/CRowTypeInfo.scala#L32
>
>
> that returns the TypeInfo of the underlying row, whereas the
> TypeInformation for Tuple2 will return type information that contains the
> boolean for the retraction + a nested type info for the row. So all
> downstream operations that rely on seeing just the row type info now break
> for us.
>
> On Wed, Jun 17, 2020 at 9:23 PM Jark Wu <imj...@gmail.com> wrote:
>
>> Hi John,
>>
>> Maybe I misunderstand something, but CRow doesn't have the `getSchema()`
>> method. You can getSchema() on the Table, this also works if you convert
>> the table into Tuple2<Boolean, Row>.
>> Actually, there is no big difference between CRow and Tuple2<Boolean,
>> Row>, they both wrap the change flag and the Row.
>>
>> Best,
>> Jark
>>
>>
>>
>> On Thu, 18 Jun 2020 at 06:39, John Mathews <jmathews3...@gmail.com>
>> wrote:
>>
>>> Hello Godfrey,
>>>
>>> Thanks for the response!
>>>
>>> I think the problem with Tuple2, is that if my understanding is correct
>>> of how CRow worked, when CRow's getSchema() was returned it would return
>>> the underlying schema of the row it contained. Tuple2 doesn't do that, so
>>> it changes/breaks a lot of our downstream code that is relying on the
>>> TableSchema to return the underlying row's schema, and not a Tuple schema.
>>>
>>> Any thoughts on that issue?
>>>
>>>
>>> On Wed, Jun 17, 2020 at 12:16 AM godfrey he <godfre...@gmail.com> wrote:
>>>
>>>> hi John,
>>>>
>>>> You can use Tuple2[Boolean, Row] to replace CRow, the
>>>> StreamTableEnvironment#toRetractStream method return DataStream[(Boolean,
>>>> T)].
>>>>
>>>> the code looks like:
>>>>
>>>> tEnv.toRetractStream[Row](table).map(new MapFunction[(Boolean, Row), R]
>>>> {
>>>>       override def map(value: (Boolean, Row)): R = ...
>>>>     })
>>>>
>>>> Bests,
>>>> Godfrey
>>>>
>>>> John Mathews <jmathews3...@gmail.com> 于2020年6月17日周三 下午12:13写道:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am working on migrating from the flink table-planner to the new
>>>>> blink one, and one problem I am running into is that it doesn't seem like
>>>>> Blink has a concept of a CRow, unlike the original table-planner.
>>>>>
>>>>> I am therefore struggling to figure out how to properly convert a
>>>>> retracting stream to a SingleOutputStreamOperator when using just the 
>>>>> Blink
>>>>> planner libraries.
>>>>>
>>>>> E.g. in the old planner I could do something like this:
>>>>> SingleOutputStreamOperator<CRow> stream =
>>>>> tableEnvironment.toRetractStream(table, typeInfo)
>>>>>                     .map(value -> new CRow(value.f1, value.f0);
>>>>>
>>>>> but without the CRow, I'm not sure how to accomplish this.
>>>>>
>>>>> Any suggestions?
>>>>>
>>>>> Thanks!
>>>>> John
>>>>>
>>>>>
>>>>>

Reply via email to