[ 
https://issues.apache.org/jira/browse/FLINK-35726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske reassigned FLINK-35726:
-------------------------------------

    Assignee: Santwana Verma

> Data Stream to Table API converts Map to RAW 'java.util.Map'
> ------------------------------------------------------------
>
>                 Key: FLINK-35726
>                 URL: https://issues.apache.org/jira/browse/FLINK-35726
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.2
>            Reporter: David Perkins
>            Assignee: Santwana Verma
>            Priority: Major
>
> We have a use case where we convert from the Table API to a Data Stream with 
> a class, perform some operations, and then convert back to the Table API. 
> When the data contains a Map, the conversion back to the Table API converts 
> the Map to {{{}RAW('java.util.Map', '...'){}}}. This causes an 'Incompatible 
> types for sink column' exception.
> In this particular case, the Map contains the Kafka headers, which we need to 
> preserve and write to the output topic. Both topics/table definitions use the 
> same schema. We have set a {{DataTypeHint}} annotation on the Map field in 
> the Java class. We are currently working around this issue by using a UDF to 
> simply perform a type conversion from the RAW Java Map to the Table API Map.
> One note is that if no operations are performed on the stream, it work's 
> correctly. But adding a simple identity map causes the exception.
> Here's a simple example to reproduce the problem.
> CREATE TABLE Source (
> id STRING,
> headers MAP<STRING, BYTES> METADATA
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'source',
> 'properties.bootstrap.servers' = 'kafka-bootstrap-server',
> 'format' = 'json'
> );
> CREATE TABLE Target (
> id STRING,
> headers MAP<STRING, BYTES> METADATA
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'target',
> 'properties.bootstrap.servers' = 'kafka-bootstrap-server',
> 'format' = 'json'
> );
> public class MyRecord {
> private String id;
> @DataTypeHint(value = "MAP<STRING, BYTES>")
> private Map<String,byte[]> headers;
> ...
> }
> public class MyJob {
> public static void main(String[] args) throws Exception{
> final StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv); Table sourceTable = 
> tableEnv.from("Source"); var sourceStream = 
> tableEnv.toDataStream(sourceTable, MyRecord.class); var mappedStream = 
> sourceStream.map(row -> row); Table outputTable = 
> tableEnv.fromDataStream(mappedStream); 
> tableEnv.createStatementSet().add(outputTable.insertInto("Target")) 
> .attachAsDataStream(); streamEnv.executeAsync("Table Datastream test"); 
> }
> }



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to