[ https://issues.apache.org/jira/browse/FLINK-35726?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Fabian Hueske reassigned FLINK-35726: ------------------------------------- Assignee: Santwana Verma > Data Stream to Table API converts Map to RAW 'java.util.Map' > ------------------------------------------------------------ > > Key: FLINK-35726 > URL: https://issues.apache.org/jira/browse/FLINK-35726 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.17.2 > Reporter: David Perkins > Assignee: Santwana Verma > Priority: Major > > We have a use case where we convert from the Table API to a Data Stream with > a class, perform some operations, and then convert back to the Table API. > When the data contains a Map, the conversion back to the Table API converts > the Map to {{{}RAW('java.util.Map', '...'){}}}. This causes an 'Incompatible > types for sink column' exception. > In this particular case, the Map contains the Kafka headers, which we need to > preserve and write to the output topic. Both topics/table definitions use the > same schema. We have set a {{DataTypeHint}} annotation on the Map field in > the Java class. We are currently working around this issue by using a UDF to > simply perform a type conversion from the RAW Java Map to the Table API Map. > One note is that if no operations are performed on the stream, it work's > correctly. But adding a simple identity map causes the exception. > Here's a simple example to reproduce the problem. > CREATE TABLE Source ( > id STRING, > headers MAP<STRING, BYTES> METADATA > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'source', > 'properties.bootstrap.servers' = 'kafka-bootstrap-server', > 'format' = 'json' > ); > CREATE TABLE Target ( > id STRING, > headers MAP<STRING, BYTES> METADATA > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'target', > 'properties.bootstrap.servers' = 'kafka-bootstrap-server', > 'format' = 'json' > ); > public class MyRecord { > private String id; > @DataTypeHint(value = "MAP<STRING, BYTES>") > private Map<String,byte[]> headers; > ... > } > public class MyJob { > public static void main(String[] args) throws Exception{ > final StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > final StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv); Table sourceTable = > tableEnv.from("Source"); var sourceStream = > tableEnv.toDataStream(sourceTable, MyRecord.class); var mappedStream = > sourceStream.map(row -> row); Table outputTable = > tableEnv.fromDataStream(mappedStream); > tableEnv.createStatementSet().add(outputTable.insertInto("Target")) > .attachAsDataStream(); streamEnv.executeAsync("Table Datastream test"); > } > } -- This message was sent by Atlassian Jira (v8.20.10#820010)