Hello,

I'm using Flink 1.11.2 where I have a SQL backed `Table` which I'm
trying to write to Kafka.  I'm using `KafkaTableSourceSinkFactory`
which ends up instantiating a table sink of type `KafkaTableSink`.
Since this sink is an `AppendStreamTableSink`, I cannot write to it
using a generic table which might produce update/delete records.

What are my options to still write to Kafka?  I don't mind introducing
another boolean/etc field in the Kafka output records containing the
row kind or similar info.

The first workaround that I tried is to convert the table to a
DataStream, but that ran into a second issue as indicated below.  I'm
happy to consider other alternatives, potentially which can be
achieved at Table API level.

When converting to DataStream API, the `table.getSchema().toRowType()`
call below (`TableSchema.toRowType()`) may fail when the underlying
`DataType` is not convertible to a `TypeInformation`, e.g. I get the
following error:

```
TableException: Unsupported conversion from data type 'INTERVAL
SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type
information. Only data types that originated from type information
fully support a reverse conversion.
```

Table to DataStream conversion and write to Kafka --
```
var rowType = table.getSchema().toRowType();
var kafkaRecordType = insertFieldAtIndex(
    (RowTypeInfo)rowType, 0, "__row_kind", Types.BOOLEAN);
var outputStream =
    tableEnvironment.toRetractStream(table, rowType).map(
            new PrependChangeTypeToRowMapper(), kafkaRecordType);
var serializationSchema = JsonRowSerializationSchema.builder()
                              .withTypeInfo(kafkaRecordType)
                              .build();
var sink = new FlinkKafkaProducer<>(
    kafkaTopic, serializationSchema, kafkaProperties);
outputStream.addSink(sink).name(sinkName);
```

Thanks,
Abhishek

Reply via email to