[ https://issues.apache.org/jira/browse/FLINK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hong Liang Teoh reassigned FLINK-36428: --------------------------------------- Assignee: maoxingda > DynamoDb Table API Sink fails when null value in the RowData > ------------------------------------------------------------ > > Key: FLINK-36428 > URL: https://issues.apache.org/jira/browse/FLINK-36428 > Project: Flink > Issue Type: Bug > Components: Connectors / AWS > Affects Versions: 1.18.1 > Reporter: maoxingda > Assignee: maoxingda > Priority: Minor > Labels: pull-request-available, starter > Fix For: 1.19.1 > > > DynamoDb Table API Sink fails when there are null values in the RowData. > > package com.meican; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > public class SqlDynamodbSinkApp { > public static void main(String[] args) { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.executeSql("create temporary view source as " + > "select '1' as id, 'name1' as name, 18 as age union all " + > "select '2' as id, 'name2' as name, 19 as age union all " + > "select '3' as id, cast(null as string) as name, 20 as age" > ); > tableEnv.executeSql("create table sink" + > "(" + > " id string," + > " name string," + > " age int" + > ") partitioned by ( id )" + > "with" + > "(" + > " 'connector' = 'dynamodb'," + > " 'aws.region' = 'cn-northwest-1'," + > " 'table-name' = 'bi-oltp-mydata'," + > " 'ignore-nulls' = 'true'" + > ")" > ); > tableEnv.executeSql("insert into sink select * from source"); > } > } > > java.lang.NullPointerException: null > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27) > ~[flink-table-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518) > ~[dynamodb-enhanced-2.20.144.jar:?] > at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?] > at > java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) > ~[?:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67) > ~[dynamodb-enhanced-2.20.144.jar:?] > at > org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35) > ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17] > at > org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328) > ~[flink-connector-files-1.17.2.jar:1.17.2] > at > org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) > ~[flink-streaming-java-1.18.0.jar:1.18.0] > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) > ~[flink-runtime-1.18.0.jar:1.18.0] > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) > [flink-runtime-1.18.0.jar:1.18.0] > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) > [flink-runtime-1.18.0.jar:1.18.0] > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) > [flink-runtime-1.18.0.jar:1.18.0] > at java.lang.Thread.run(Thread.java:829) [?:?] -- This message was sent by Atlassian Jira (v8.20.10#820010)