Hi Matt, First of all, awesome that you are using the DynamoDB sink!
To resolve your issue with serialization in the DDB sink, you are right, the issue only happens when you create the AttributeValue object in a previous operator and send it to the sink. The issue here is with serializing of ImmutableMap. Kryo tries to call the put(), which is unsupported since its immutable, so you can register a specific serializer for it. Like below: env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, ImmutableMapSerializer.class); You can get ImmutableMapSerializer.class from a pre-package library like this: https://github.com/magro/kryo-serializers Just add the following to your pom.xml <dependency> <groupId>de.javakaffee</groupId> <artifactId>kryo-serializers</artifactId> <version>0.45</version> </dependency> Regarding resources, I find the following helpful: * Article on serialization<-%09https:/alibaba-cloud.medium.com/data-types-and-serialization-flink-advanced-tutorials-b363241c8836> * The FlinkForward youtube channel has a couple of useful deep dives on Flink in general : https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists Hope the above helps. A more general question on your use case, what is the reason you want to generate the AttributeValue in a previous operator rather than in the sink directly? Is it for some dynamic generation of objects to write into DDB? Regards, Hong From: Matt Fysh <mattf...@gmail.com> Date: Tuesday, 8 November 2022 at 14:04 To: User <user@flink.apache.org> Subject: [EXTERNAL] How to write custom serializer for dynamodb connector CAUTION: This email originated from outside of the organization. Do not click links or open attachments unless you can confirm the sender and know the content is safe. I'm attempting to use the dynamodb sink located at https://github.com/apache/flink-connector-aws The example<https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java> in the repo is working as expected, however when I try to create a nested data structure, I receive a Kryo serialization error message: Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: m (software.amazon.awssdk.services.dynamodb.model.AttributeValue) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) The value that cannot be serialized is produced by this code: import software.amazon.awssdk.services.dynamodb.model.AttributeValue; AttributeValue.builder().m( ImmutableMap.of( "innerkey", AttributeValue.builder().s("innervalue").build() ) ).build(); There are tests in the connector repo<https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84> for nested map structures, but they do not test that the structure can be ser/de by Flink, which I believe occurs when the operator that produces the value is separate to the sink operator. Given that this is a fairly simple data type, I should be able to register a custom serializer with Flink, but since I'm new to java I'm having trouble making sense of the docs<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/> and was hoping to find someone more knowledgeable in this area for some pointers on what else I could start reading Thanks Matt