Hey Matt,

Thanks for the feedback, I have updated the SinkIntoDynamoDb [1] sample to
avoid this in future. We have recently added support for @DynamoDbBean
annotated POJOs which you might find interesting. This removes the need to
create a custom ElementConverter all together,
see SinkDynamoDbBeanIntoDynamoDb [2].

Thanks Hong for looking in to this!

[1]
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java
[2]
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java

Thanks,

On Tue, Nov 8, 2022 at 9:06 PM Matt Fysh <mattf...@gmail.com> wrote:

> Thanks Hong, I moved the AttributeValue creation into the ElementConverter
> and it started working without any custom serde work!
>
> The reason for creating AttributeValue instances in a previous operator is
> that I was closely following the example code:
> https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java
>
> Thank you again for your help and sharing those resources.
>
> Cheers,
> Matt.
>
>
> On Wed, 9 Nov 2022 at 03:51, Teoh, Hong <lian...@amazon.co.uk> wrote:
>
>> Hi Matt,
>>
>>
>>
>> First of all, awesome that you are using the DynamoDB sink!
>>
>>
>>
>> To resolve your issue with serialization in the DDB sink, you are right,
>> the issue only happens when you create the AttributeValue object in a
>> previous operator and send it to the sink.
>>
>> The issue here is with serializing of ImmutableMap. Kryo tries to call
>> the put(), which is unsupported since its immutable, so you can register a
>> specific serializer for it. Like below:
>>
>>
>>
>> env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class,
>> ImmutableMapSerializer.class);
>>
>>
>>
>> You can get ImmutableMapSerializer.class from a pre-package library like
>> this: https://github.com/magro/kryo-serializers
>>
>> Just add the following to your pom.xml
>>
>>
>>
>> <dependency>
>>
>>     <groupId>de.javakaffee</groupId>
>>
>>     <artifactId>kryo-serializers</artifactId>
>>
>>     <version>0.45</version>
>>
>> </dependency>
>>
>>
>>
>> Regarding resources, I find the following helpful:
>>
>>    - Article on serialization
>>    - The FlinkForward youtube channel has a couple of useful deep dives
>>    on Flink in general :
>>    https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists
>>
>>
>>
>> Hope the above helps.
>>
>>
>>
>>
>>
>> A more general question on your use case, what is the reason you want to
>> generate the AttributeValue in a previous operator rather than in the sink
>> directly? Is it for some dynamic generation of objects to write into DDB?
>>
>>
>>
>> Regards,
>>
>> Hong
>>
>>
>>
>>
>>
>> *From: *Matt Fysh <mattf...@gmail.com>
>> *Date: *Tuesday, 8 November 2022 at 14:04
>> *To: *User <user@flink.apache.org>
>> *Subject: *[EXTERNAL] How to write custom serializer for dynamodb
>> connector
>>
>>
>>
>> *CAUTION*: This email originated from outside of the organization. Do
>> not click links or open attachments unless you can confirm the sender and
>> know the content is safe.
>>
>>
>>
>> I'm attempting to use the dynamodb sink located at
>> https://github.com/apache/flink-connector-aws
>>
>>
>>
>> The example
>> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
>> in the repo is working as expected, however when I try to create a nested
>> data structure, I receive a Kryo serialization error message:
>>
>>
>>
>> Caused by: com.esotericsoftware.kryo.KryoException:
>> java.lang.UnsupportedOperationException
>>
>> Serialization trace:
>>
>> m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
>>
>> at
>> com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
>>
>>
>>
>> The value that cannot be serialized is produced by this code:
>>
>> import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
>>
>>
>>
>> AttributeValue.builder().m(
>>
>>   ImmutableMap.of(
>>
>>     "innerkey", AttributeValue.builder().s("innervalue").build()
>>
>>   )
>>
>> ).build();
>>
>>
>>
>> There are tests in the connector repo
>> <https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
>> for nested map structures, but they do not test that the structure can be
>> ser/de by Flink, which I believe occurs when the operator that produces the
>> value is separate to the sink operator.
>>
>>
>>
>> Given that this is a fairly simple data type, I should be able to
>> register a custom serializer with Flink, but since I'm new to java I'm
>> having trouble making sense of the docs
>> <https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
>> and was hoping to find someone more knowledgeable in this area for some
>> pointers on what else I could start reading
>>
>>
>>
>> Thanks
>>
>> Matt
>>
>

Reply via email to