[ 
https://issues.apache.org/jira/browse/FLINK-36428?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hong Liang Teoh reassigned FLINK-36428:
---------------------------------------

    Assignee: maoxingda

> DynamoDb Table API Sink fails when null value in the RowData
> ------------------------------------------------------------
>
>                 Key: FLINK-36428
>                 URL: https://issues.apache.org/jira/browse/FLINK-36428
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / AWS
>    Affects Versions: 1.18.1
>            Reporter: maoxingda
>            Assignee: maoxingda
>            Priority: Minor
>              Labels: pull-request-available, starter
>             Fix For: 1.19.1
>
>
> DynamoDb Table API Sink fails when there are null values in the RowData.
>  
> package com.meican;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> public class SqlDynamodbSinkApp {
> public static void main(String[] args) {
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> tableEnv.executeSql("create temporary view source as " +
> "select '1' as id, 'name1' as name, 18 as age union all " +
> "select '2' as id, 'name2' as name, 19 as age union all " +
> "select '3' as id, cast(null as string) as name, 20 as age"
> );
> tableEnv.executeSql("create table sink" +
> "(" +
> " id string," +
> " name string," +
> " age int" +
> ") partitioned by ( id )" +
> "with" +
> "(" +
> " 'connector' = 'dynamodb'," +
> " 'aws.region' = 'cn-northwest-1'," +
> " 'table-name' = 'bi-oltp-mydata'," +
> " 'ignore-nulls' = 'true'" +
> ")"
> );
> tableEnv.executeSql("insert into sink select * from source");
> }
> }
>  
> java.lang.NullPointerException: null
>     at 
> org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39)
>  ~[flink-table-runtime-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27)
>  ~[flink-table-runtime-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88)
>  ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
>     at 
> software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54)
>  ~[dynamodb-enhanced-2.20.144.jar:?]
>     at 
> software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518)
>  ~[dynamodb-enhanced-2.20.144.jar:?]
>     at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
>     at 
> java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) 
> ~[?:?]
>     at 
> software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516)
>  ~[dynamodb-enhanced-2.20.144.jar:?]
>     at 
> software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67)
>  ~[dynamodb-enhanced-2.20.144.jar:?]
>     at 
> org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53)
>  ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
>     at 
> org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56)
>  ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
>     at 
> org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35)
>  ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
>     at 
> org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328)
>  ~[flink-connector-files-1.17.2.jar:1.17.2]
>     at 
> org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
>  ~[flink-streaming-java-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
>  ~[flink-runtime-1.18.0.jar:1.18.0]
>     at 
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
> [flink-runtime-1.18.0.jar:1.18.0]
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) 
> [flink-runtime-1.18.0.jar:1.18.0]
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
> [flink-runtime-1.18.0.jar:1.18.0]
>     at java.lang.Thread.run(Thread.java:829) [?:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to