[ https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250 ]
Devin Thomson commented on FLINK-4582: -------------------------------------- [~yxu-lyft] circling back here, looks like this got merged in, congrats! I've been preemptively cutting over from my implementation to this one and noticed one (blocking) bug: {code:java} j.l.IllegalArgumentException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) vs org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params) at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300) at c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619) at c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515) at c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256) at c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169) at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403) at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352) at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264) ... 15 common frames omitted Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for property \"eventName\": org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1 params... {code} It looks like the root cause is that DynamoDBStreamsSchema.java defines the object mapper as follows: {code:java} private static final ObjectMapper MAPPER = new ObjectMapper(); {code} When it should be using the appropriate mix-ins offered by the dynamodb stream adapter library: {code:java} private static final ObjectMapper MAPPER = new RecordObjectMapper(); {code} This appears to resolve the issue, I tested by using my own deserializer implementation. Not sure if it makes sense to track this as a separate issue or not since this is still a 1.8-SNAPSHOT feature. Let me know if you have any questions! - Devin > Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams > ------------------------------------------------------------ > > Key: FLINK-4582 > URL: https://issues.apache.org/jira/browse/FLINK-4582 > Project: Flink > Issue Type: New Feature > Components: Kinesis Connector, Streaming Connectors > Reporter: Tzu-Li (Gordon) Tai > Assignee: Ying Xu > Priority: Major > Labels: pull-request-available > Fix For: 1.8.0 > > Time Spent: 10m > Remaining Estimate: 0h > > AWS DynamoDB is a NoSQL database service that has a CDC-like (change data > capture) feature called DynamoDB Streams > (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html), > which is a stream feed of item-level table activities. > The DynamoDB Streams shard abstraction follows that of Kinesis Streams with > only a slight difference in resharding behaviours, so it is possible to build > on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB > Streams source. > I propose an API something like this: > {code} > DataStream dynamoItemsCdc = > FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config) > {code} > The feature adds more connectivity to popular AWS services for Flink, and > combining what Flink has for exactly-once semantics, out-of-core state > backends, and queryable state with CDC can have very strong use cases. For > this feature there should only be an extra dependency to the AWS Java SDK for > DynamoDB, which has Apache License 2.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005)