Hi Matt,

First of all, awesome that you are using the DynamoDB sink!

To resolve your issue with serialization in the DDB sink, you are right, the 
issue only happens when you create the AttributeValue object in a previous 
operator and send it to the sink.
The issue here is with serializing of ImmutableMap. Kryo tries to call the 
put(), which is unsupported since its immutable, so you can register a specific 
serializer for it. Like below:

env.getConfig().registerTypeWithKryoSerializer(ImmutableMap.class, 
ImmutableMapSerializer.class);

You can get ImmutableMapSerializer.class from a pre-package library like this: 
https://github.com/magro/kryo-serializers
Just add the following to your pom.xml

<dependency>
    <groupId>de.javakaffee</groupId>
    <artifactId>kryo-serializers</artifactId>
    <version>0.45</version>
</dependency>

Regarding resources, I find the following helpful:

  *   Article on 
serialization<-%09https:/alibaba-cloud.medium.com/data-types-and-serialization-flink-advanced-tutorials-b363241c8836>
  *   The FlinkForward youtube channel has a couple of useful deep dives on 
Flink in general : 
https://www.youtube.com/channel/UCY8_lgiZLZErZPF47a2hXMA/playlists

Hope the above helps.


A more general question on your use case, what is the reason you want to 
generate the AttributeValue in a previous operator rather than in the sink 
directly? Is it for some dynamic generation of objects to write into DDB?

Regards,
Hong


From: Matt Fysh <mattf...@gmail.com>
Date: Tuesday, 8 November 2022 at 14:04
To: User <user@flink.apache.org>
Subject: [EXTERNAL] How to write custom serializer for dynamodb connector


CAUTION: This email originated from outside of the organization. Do not click 
links or open attachments unless you can confirm the sender and know the 
content is safe.


I'm attempting to use the dynamodb sink located at 
https://github.com/apache/flink-connector-aws

The 
example<https://github.com/apache/flink-connector-aws/blob/main/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java>
 in the repo is working as expected, however when I try to create a nested data 
structure, I receive a Kryo serialization error message:

Caused by: com.esotericsoftware.kryo.KryoException: 
java.lang.UnsupportedOperationException
Serialization trace:
m (software.amazon.awssdk.services.dynamodb.model.AttributeValue)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

The value that cannot be serialized is produced by this code:
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

AttributeValue.builder().m(
  ImmutableMap.of(
    "innerkey", AttributeValue.builder().s("innervalue").build()
  )
).build();

There are tests in the connector 
repo<https://github.com/apache/flink-connector-aws/blob/3798aabfcc6f78645bf3d7255dfd6c336cd497f0/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/util/DynamoDbSerializationUtilTest.java#L70-L84>
 for nested map structures, but they do not test that the structure can be 
ser/de by Flink, which I believe occurs when the operator that produces the 
value is separate to the sink operator.

Given that this is a fairly simple data type, I should be able to register a 
custom serializer with Flink, but since I'm new to java I'm having trouble 
making sense of the 
docs<https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/fault-tolerance/serialization/types_serialization/>
 and was hoping to find someone more knowledgeable in this area for some 
pointers on what else I could start reading

Thanks
Matt

Reply via email to