[ https://issues.apache.org/jira/browse/FLINK-35022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17861118#comment-17861118 ]
Hong Liang Teoh commented on FLINK-35022: ----------------------------------------- merged commit [{{36479a9}}|https://github.com/apache/flink-connector-aws/commit/36479a98e11c154bd1537e11e8cc51e64a0ebcb8] into apache:main > Add TypeInformed Element Converter for DynamoDbSink > --------------------------------------------------- > > Key: FLINK-35022 > URL: https://issues.apache.org/jira/browse/FLINK-35022 > Project: Flink > Issue Type: Improvement > Components: Connectors / DynamoDB > Affects Versions: aws-connector-4.3.0 > Reporter: Ahmed Hamdy > Assignee: Ahmed Hamdy > Priority: Major > Labels: pull-request-available > > h2. Context > {{DynamoDbSink}} as an extentsion of {{AsyncSinkBase}} depends on > {{org.apache.flink.connector.base.sink.writer.ElementConverter}} to convert > Flink stream objects to DynamoDb write requests, where item is represented as > {{Map<String, AttributeValue[1]>}}. > {{AttributeValue}} is the wrapper for the DynamoDb comprehendable Object in a > format similar with type identification properties as in > {M": {"Name" : {"S": Joe }, "Age" : {"N": 35 }}}. > Since TypeInformation is already natively supported in Flink, many > implementations of the DynamoDb ElementConverted is just a boiler plate. > For example > {code:title="Simple POJO Element Conversion"} > public class Order { > String id; > int quantity; > double total; > } > {code} > The implementation of the converter must be > {code:title="Simple POJO DDB Element Converter"} > public static class SimplePojoElementConverter implements > ElementConverter<Order, DynamoDbWriteRequest> { > @Override > public DynamoDbWriteRequest apply(Order order, SinkWriter.Context > context) { > Map<String, AttributeValue> itemMap = new HashMap<>(); > itemMap.put("id", AttributeValue.builder().s(order.id).build()); > itemMap.put("quantity", > AttributeValue.builder().n(String.valueOf(order.quantity)).build()); > itemMap.put("total", > AttributeValue.builder().n(String.valueOf(order.total)).build()); > return DynamoDbWriteRequest.builder() > .setType(DynamoDbWriteRequestType.PUT) > .setItem(itemMap) > .build(); > } > @Override > public void open(Sink.InitContext context) { > > } > } > {code} > while this might not be too much of work, however it is a fairly common case > in Flink and this implementation requires some fair knowledge of DDB model > for new users. > h2. Proposal > Introduce {{ DynamoDbTypeInformedElementConverter}} as follows: > {code:title="TypeInformedElementconverter"} > public class DynamoDbTypeInformedElementConverter<inputT> implements > ElementConverter<inputT, DynamoDbWriteRequest> { > DynamoDbTypeInformedElementConverter(CompositeType<inputT> typeInfo); > public DynamoDbWriteRequest convertElement(input) { > switch this.typeInfo{ > case: BasicTypeInfo.STRING_TYPE_INFO: return input -> > AttributeValue.fromS(o.toString()) > case: BasicTypeInfo.SHORT_TYPE_INFO: > case: BasicTypeInfo.INTEGER_TYPE_INFO: input -> > AttributeValue.fromN(o.toString()) > case: TupleTypeInfo: input -> AttributeValue.fromL(converTuple(input)) > ..... > } > } > } > // User Code > public static void main(String []args) { > DynamoDbTypeInformedElementConverter elementConverter = new > DynamoDbTypeInformedElementConverter(TypeInformation.of(Order.class)); > DdbSink.setElementConverter(elementConverter); > } > {code} > We will start by supporting all Pojo/ basic/ Tuple/ Array typeInfo which > should be enough to cover all DDB supported types > (s,n,bool,b,ss,ns,bs,bools,m,l) > 1- > https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/dynamodb/model/AttributeValue.html -- This message was sent by Atlassian Jira (v8.20.10#820010)