Hi All, We've flink (1.0.2) HA setup on AWS cloud and are using IAM roles to interact with S3 (S3a as suggested in flink best practices) and DynamoDB. While trying to interact with DynamoDB to perform key-value pair lookup from one of the operator we are running into the following issue.
def putItem() = { val id = 1266399999L val item = new Item().withPrimaryKey("Id", "sfsaf12344").withLong("uniqueIdentifier", id) table.putItem(item) } 2016-07-04 17:15:18,379 PDT [INFO] ip-10-6-10-182 [flink-akka.actor.default-dispatcher-29] o.a.f.runtime.jobmanager.JobManager - Status of job 3ec7e145208453b5dbcf6224f373018f (Topology) changed to FAILING. org.apache.flink.runtime.util.SerializedThrowable: com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; at com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.doPutItem(PutItemImpl.java:82) at com.amazonaws.services.dynamodbv2.document.internal.PutItemImpl.putItem(PutItemImpl.java:41) at com.amazonaws.services.dynamodbv2.document.Table.putItem(Table.java:144) at com.mix.ingestion.url.dupedetection.DynamoDBIO$.putItem(DynamoDBHandler.scala:38) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKey(ABC.scala:143) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetectionBaseImpl.setKeyAndUpdateDupeFlag(ABC.scala:135) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.updateDupeFlagAndTable(ABC.scala:96) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$class.detectDupe(ABC.scala:111) at com.mix.ingestion.url.dupedetection.DynamoDBDupeDetection$.detectDupe(ABC.scala:158) at com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) at com.mix.ingestion.topology.Operators$$anonfun$15$$anonfun$apply$3.apply(Operators.scala:70) at org.apache.flink.streaming.api.scala.DataStream$$anon$4.map(DataStream.scala:485) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:168) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.Exception: Serialized representation of java.lang.NoSuchMethodError: com.amazonaws.services.dynamodbv2.model.PutItemRequest.withExpressionAttributeNames(Ljava/util/Map;)Lcom/amazonaws/services/dynamodbv2/model/PutItemRequest; ... 18 common frames omitted It works if I just run it in standalone fashion using "*java -cp fatjar.jar:/opt/flink/lib/* a.b.c.d.DynamoDBHandler"* on the same ec2 instance but I'm running into error when it tries to interact with DynamoDB from inside an operator. It fails even if I call the same *putItem* from inside the operator. We've aws-java-sdk-1.7.4.jar , hadoop-aws-2.7.2.jar in flink/lib folder. We're using fatjar to deploy the topology and it contains aws-java-sdk-s3 and aws-java-sdk-dynamodb both 1.11.3 version. I also experimented with using aws-java-sdk in fatjar as well but it did not work. I looked into aws-java-sdk-1.7.4.jar and see that com/amazonaws/services/dynamodbv2 exists. Please let me know what am I doing wrong. Any help will be appreciated. -- Thanks, Deepak Jha