maoxingda created FLINK-36428:
---------------------------------

             Summary: DynamoDb Table API Sink fails when null value in the 
RowData
                 Key: FLINK-36428
                 URL: https://issues.apache.org/jira/browse/FLINK-36428
             Project: Flink
          Issue Type: Bug
          Components: Connectors / AWS
    Affects Versions: 1.18.1
            Reporter: maoxingda
             Fix For: 1.19.1


DynamoDb Table API Sink fails when there are null values in the RowData.

 
package com.meican;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


public class SqlDynamodbSinkApp {
public static void main(String[] args) {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql("create temporary view source as " +
"select '1' as id, 'name1' as name, 18 as age union all " +
"select '2' as id, 'name2' as name, 19 as age union all " +
"select '3' as id, cast(null as string) as name, 20 as age"
);

tableEnv.executeSql("create table sink" +
"(" +
" id string," +
" name string," +
" age int" +
") partitioned by ( id )" +
"with" +
"(" +
" 'connector' = 'dynamodb'," +
" 'aws.region' = 'cn-northwest-1'," +
" 'table-name' = 'bi-oltp-mydata'," +
" 'ignore-nulls' = 'true'" +
")"
);

tableEnv.executeSql("insert into sink select * from source");
}
}
 
java.lang.NullPointerException: null
    at 
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:39)
 ~[flink-table-runtime-1.18.0.jar:1.18.0]
    at 
org.apache.flink.table.data.conversion.StringStringConverter.toExternal(StringStringConverter.java:27)
 ~[flink-table-runtime-1.18.0.jar:1.18.0]
    at 
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.lambda$addAttribute$0(RowDataToAttributeValueConverter.java:88)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
software.amazon.awssdk.enhanced.dynamodb.internal.mapper.ResolvedImmutableAttribute.lambda$create$0(ResolvedImmutableAttribute.java:54)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.lambda$itemToMap$5(StaticImmutableTableSchema.java:518)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
    at 
java.util.Collections$UnmodifiableCollection.forEach(Collections.java:1085) 
~[?:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.StaticImmutableTableSchema.itemToMap(StaticImmutableTableSchema.java:516)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
software.amazon.awssdk.enhanced.dynamodb.mapper.WrappedTableSchema.itemToMap(WrappedTableSchema.java:67)
 ~[dynamodb-enhanced-2.20.144.jar:?]
    at 
org.apache.flink.connector.dynamodb.table.RowDataToAttributeValueConverter.convertRowData(RowDataToAttributeValueConverter.java:53)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:56)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.dynamodb.table.RowDataElementConverter.apply(RowDataElementConverter.java:35)
 ~[flink-connector-dynamodb-4.2.0-1.17.jar:4.2.0-1.17]
    at 
org.apache.flink.connector.base.sink.writer.AsyncSinkWriter.write(AsyncSinkWriter.java:328)
 ~[flink-connector-files-1.17.2.jar:1.17.2]
    at 
org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:161)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
 ~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) 
~[flink-streaming-java-1.18.0.jar:1.18.0]
    at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
 ~[flink-runtime-1.18.0.jar:1.18.0]
    at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) 
[flink-runtime-1.18.0.jar:1.18.0]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) 
[flink-runtime-1.18.0.jar:1.18.0]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) 
[flink-runtime-1.18.0.jar:1.18.0]
    at java.lang.Thread.run(Thread.java:829) [?:?]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to