Hello, I'm using Flink 1.11.2 where I have a SQL backed `Table` which I'm trying to write to Kafka. I'm using `KafkaTableSourceSinkFactory` which ends up instantiating a table sink of type `KafkaTableSink`. Since this sink is an `AppendStreamTableSink`, I cannot write to it using a generic table which might produce update/delete records.
What are my options to still write to Kafka? I don't mind introducing another boolean/etc field in the Kafka output records containing the row kind or similar info. The first workaround that I tried is to convert the table to a DataStream, but that ran into a second issue as indicated below. I'm happy to consider other alternatives, potentially which can be achieved at Table API level. When converting to DataStream API, the `table.getSchema().toRowType()` call below (`TableSchema.toRowType()`) may fail when the underlying `DataType` is not convertible to a `TypeInformation`, e.g. I get the following error: ``` TableException: Unsupported conversion from data type 'INTERVAL SECOND(3) NOT NULL' (conversion class: java.time.Duration) to type information. Only data types that originated from type information fully support a reverse conversion. ``` Table to DataStream conversion and write to Kafka -- ``` var rowType = table.getSchema().toRowType(); var kafkaRecordType = insertFieldAtIndex( (RowTypeInfo)rowType, 0, "__row_kind", Types.BOOLEAN); var outputStream = tableEnvironment.toRetractStream(table, rowType).map( new PrependChangeTypeToRowMapper(), kafkaRecordType); var serializationSchema = JsonRowSerializationSchema.builder() .withTypeInfo(kafkaRecordType) .build(); var sink = new FlinkKafkaProducer<>( kafkaTopic, serializationSchema, kafkaProperties); outputStream.addSink(sink).name(sinkName); ``` Thanks, Abhishek