maoxingda created FLINK-36428: --------------------------------- Summary: DynamoDb Table API Sink fails when null value in the RowData Key: FLINK-36428 URL: https://issues.apache.org/jira/browse/FLINK-36428 Project: Flink Issue Type: Bug Components: Connectors / AWS Affects Versions: 1.18.1 Reporter: maoxingda Fix For: 1.19.1
DynamoDb Table API Sink fails when there are null values in the RowData. package com.meican; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; public class SqlDynamodbSinkApp { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql("create temporary view source as " + "select '1' as id, 'name1' as name, 18 as age union all " + "select '2' as id, 'name2' as name, 19 as age union all " + "select '3' as id, cast(null as string) as name, 20 as age" ); tableEnv.executeSql("create table sink" + "(" + " id string," + " name string," + " age int" + ") partitioned by ( id )" + "with" + "(" + " 'connector' = 'dynamodb'," + " 'aws.region' = 'cn-northwest-1'," + " 'table-name' = 'bi-oltp-mydata'," + " 'ignore-nulls' = 'true'" + ")" ); tableEnv.executeSql("insert into sink select * from source"); } } java.lang.NullPointerException: null at org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39) ~[flink-table-runtime-1.18.0.jar:1.18.0] at org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27) ~[flink-table-runtime-1.18.0.jar:1.18.0] at org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54) ~[dynamodb-enhanced-2.20.144.jar:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518) ~[dynamodb-enhanced-2.20.144.jar:?] at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?] at java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) ~[?:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516) ~[dynamodb-enhanced-2.20.144.jar:?] at software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67) ~[dynamodb-enhanced-2.20.144.jar:?] at org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35) ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] at org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328) ~[flink-connector-files-1.17.2.jar:1.17.2] at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-streaming-java-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) ~[flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-runtime-1.18.0.jar:1.18.0] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-runtime-1.18.0.jar:1.18.0] at java.lang.Thread.run(Thread.java:829) [?:?] -- This message was sent by Atlassian Jira (v8.20.10#820010)