Hey Matt,

Thanks for the feedback, I have updated the SinkIntoDynamoDb [1] sample to
avoid this in future. We have recently added support for @DynamoDbBean
annotated POJOs which you might find interesting. This removes the need to
create a custom ElementConverter all together,
see SinkDynamoDbBeanIntoDynamoDb [2].

Thanks Hong for looking in to this!



On Tue, Nov 8, 2022 at 9:06 PM Matt Fysh <mattf...@gmail.com> wrote:

> Thanks Hong, I moved the AttributeValue creation into the ElementConverter
> and it started working without any custom serde work!
> The reason for creating AttributeValue instances in a previous operator is
> that I was closely following the example code:
> https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java
> Thank you again for your help and sharing those resources.
> Cheers,
> Matt.
> On Wed, 9 Nov 2022 at 03:51, Teoh, Hong <lian...@amazon.co.uk> wrote:
>> Hi Matt,
>> First of all, awesome that you are using the DynamoDB sink!
>> To resolve your issue with serialization in the DDB sink, you are right,
>> the issue only happens when you create the AttributeValue object in a
>> previous operator and send it to the sink.
>> The issue here is with serializing of ImmutableMap. Kryo tries to call
>> the put(), which is unsupported since its immutable, so you can register a
>> specific serializer for it. Like below:
>> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
>> ImmutableMapSerializer.class);
>> You can get ImmutableMapSerializer.class from a pre-package library like
>> this: https://github.com/magro/kryo-serializers
>> Just add the following to your pom.xml
>> <dependency>
>>     <groupId>de.javakaffee</groupId>
>>     <artifactId>kryo-serializers</artifactId>
>>     <version>0.45</version>
>> </dependency>
>> Regarding resources, I find the following helpful:
>>    - Article on serialization
>>    - The FlinkForward youtube channel has a couple of useful deep dives
>>    on Flink in general :
>>    https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists
>> Hope the above helps.
>> A more general question on your use case, what is the reason you want to
>> generate the AttributeValue in a previous operator rather than in the sink
>> directly? Is it for some dynamic generation of objects to write into DDB?
>> Regards,
>> Hong
>> *From: *Matt Fysh <mattf...@gmail.com>
>> *Date: *Tuesday, 8 November 2022 at 14:04
>> *To: *User <user@flink.apache.org>
>> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb
>> connector
>> I'm attempting to use the dynamodb sink located at
>> https://github.com/apache/flink-connector-aws
>> The example
>> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
>> in the repo is working as expected, however when I try to create a nested
>> data structure, I receive a Kryo serialization error message:
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>> Serialization trace:
>> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>> The value that cannot be serialized is produced by this code:
>> import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
>> AttributeValue.builder().m(
>>   ImmutableMap.of(
>>     "innerkey", AttributeValue.builder().s("innervalue").build()
>>   )
>> ).build();
>> There are tests in the connector repo
>> <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
>> for nested map structures, but they do not test that the structure can be
>> ser/de by Flink, which I believe occurs when the operator that produces the
>> value is separate to the sink operator.
>> Given that this is a fairly simple data type, I should be able to
>> register a custom serializer with Flink, but since I'm new to java I'm
>> having trouble making sense of the docs
>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
>> and was hoping to find someone more knowledgeable in this area for some
>> pointers on what else I could start reading
>> Thanks
>> Matt

