Hey Matt, Thanks for the feedback, I have updated the SinkIntoDynamoDb [1] sample to avoid this in future. We have recently added support for @DynamoDbBean annotated POJOs which you might find interesting. This removes the need to create a custom ElementConverter all together, see SinkDynamoDbBeanIntoDynamoDb [2].
Thanks Hong for looking in to this! [1] https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java [2] https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java Thanks, On Tue, Nov 8, 2022 at 9:06 PM Matt Fysh <mattf...@gmail.com> wrote: > Thanks Hong, I moved the AttributeValue creation into the ElementConverter > and it started working without any custom serde work! > > The reason for creating AttributeValue instances in a previous operator is > that I was closely following the example code: > https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java > > Thank you again for your help and sharing those resources. > > Cheers, > Matt. > > > On Wed, 9 Nov 2022 at 03:51, Teoh, Hong <lian...@amazon.co.uk> wrote: > >> Hi Matt, >> >> >> >> First of all, awesome that you are using the DynamoDB sink! >> >> >> >> To resolve your issue with serialization in the DDB sink, you are right, >> the issue only happens when you create the AttributeValue object in a >> previous operator and send it to the sink. >> >> The issue here is with serializing of ImmutableMap. Kryo tries to call >> the put(), which is unsupported since its immutable, so you can register a >> specific serializer for it. Like below: >> >> >> >> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, >> ImmutableMapSerializer.class); >> >> >> >> You can get ImmutableMapSerializer.class from a pre-package library like >> this: https://github.com/magro/kryo-serializers >> >> Just add the following to your pom.xml >> >> >> >> <dependency> >> >> <groupId>de.javakaffee</groupId> >> >> <artifactId>kryo-serializers</artifactId> >> >> <version>0.45</version> >> >> </dependency> >> >> >> >> Regarding resources, I find the following helpful: >> >> - Article on serialization >> - The FlinkForward youtube channel has a couple of useful deep dives >> on Flink in general : >> https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists >> >> >> >> Hope the above helps. >> >> >> >> >> >> A more general question on your use case, what is the reason you want to >> generate the AttributeValue in a previous operator rather than in the sink >> directly? Is it for some dynamic generation of objects to write into DDB? >> >> >> >> Regards, >> >> Hong >> >> >> >> >> >> *From: *Matt Fysh <mattf...@gmail.com> >> *Date: *Tuesday, 8 November 2022 at 14:04 >> *To: *User <user@flink.apache.org> >> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb >> connector >> >> >> >> *CAUTION*: This email originated from outside of the organization. Do >> not click links or open attachments unless you can confirm the sender and >> know the content is safe. >> >> >> >> I'm attempting to use the dynamodb sink located at >> https://github.com/apache/flink-connector-aws >> >> >> >> The example >> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java> >> in the repo is working as expected, however when I try to create a nested >> data structure, I receive a Kryo serialization error message: >> >> >> >> Caused by: com.esotericsoftware.kryo.KryoException: >> java.lang.UnsupportedOperationException >> >> Serialization trace: >> >> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue) >> >> at >> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) >> >> >> >> The value that cannot be serialized is produced by this code: >> >> import software.amazon.awssdk.services.dynamodb.model.AttributeValue; >> >> >> >> AttributeValue.builder().m( >> >> ImmutableMap.of( >> >> "innerkey", AttributeValue.builder().s("innervalue").build() >> >> ) >> >> ).build(); >> >> >> >> There are tests in the connector repo >> <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84> >> for nested map structures, but they do not test that the structure can be >> ser/de by Flink, which I believe occurs when the operator that produces the >> value is separate to the sink operator. >> >> >> >> Given that this is a fairly simple data type, I should be able to >> register a custom serializer with Flink, but since I'm new to java I'm >> having trouble making sense of the docs >> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/> >> and was hoping to find someone more knowledgeable in this area for some >> pointers on what else I could start reading >> >> >> >> Thanks >> >> Matt >> >