[ 
https://issues.apache.org/jira/browse/FLINK-4582?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16760250#comment-16760250
 ] 

Devin Thomson commented on FLINK-4582:
--------------------------------------

[~yxu-lyft] circling back here, looks like this got merged in, congrats!

 

I've been preemptively cutting over from my implementation to this one and 
noticed one (blocking) bug:

 
{code:java}
j.l.IllegalArgumentException: Conflicting setter definitions for property 
\"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params) vs 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params)
at c.f.j.d.i.POJOPropertyBuilder.getSetter(POJOPropertyBuilder.java:300)
at 
c.f.j.d.d.BeanDeserializerFactory.filterBeanProps(BeanDeserializerFactory.java:619)
at 
c.f.j.d.d.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:515)
at 
c.f.j.d.d.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:256)
at 
c.f.j.d.d.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:169)
at c.f.j.d.d.DeserializerCache._createDeserializer2(DeserializerCache.java:403)
at c.f.j.d.d.DeserializerCache._createDeserializer(DeserializerCache.java:352)
at c.f.j.d.d.DeserializerCache._createAndCache2(DeserializerCache.java:264)
... 15 common frames omitted
Wrapped by: c.f.j.d.JsonMappingException: Conflicting setter definitions for 
property \"eventName\": 
org.apache.flink.kinesis.shaded.com.amazonaws.services.dynamodbv2.model.Record#setEventName(1
 params...
{code}
 

 

It looks like the root cause is that DynamoDBStreamsSchema.java defines the 
object mapper as follows:
{code:java}
private static final ObjectMapper MAPPER = new ObjectMapper();
{code}
When it should be using the appropriate mix-ins offered by the dynamodb stream 
adapter library:
{code:java}
private static final ObjectMapper MAPPER = new RecordObjectMapper();
{code}
This appears to resolve the issue, I tested by using my own deserializer 
implementation. Not sure if it makes sense to track this as a separate issue or 
not since this is still a 1.8-SNAPSHOT feature.

 

Let me know if you have any questions!

- Devin

 

> Allow FlinkKinesisConsumer to adapt for AWS DynamoDB Streams
> ------------------------------------------------------------
>
>                 Key: FLINK-4582
>                 URL: https://issues.apache.org/jira/browse/FLINK-4582
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Ying Xu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> AWS DynamoDB is a NoSQL database service that has a CDC-like (change data 
> capture) feature called DynamoDB Streams 
> (http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html),
>  which is a stream feed of item-level table activities.
> The DynamoDB Streams shard abstraction follows that of Kinesis Streams with 
> only a slight difference in resharding behaviours, so it is possible to build 
> on the internals of our Flink Kinesis Consumer for an exactly-once DynamoDB 
> Streams source.
> I propose an API something like this:
> {code}
> DataStream dynamoItemsCdc = 
>   FlinkKinesisConsumer.asDynamoDBStream(tableNames, schema, config)
> {code}
> The feature adds more connectivity to popular AWS services for Flink, and 
> combining what Flink has for exactly-once semantics, out-of-core state 
> backends, and queryable state with CDC can have very strong use cases. For 
> this feature there should only be an extra dependency to the AWS Java SDK for 
> DynamoDB, which has Apache License 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to