Hi Craig,

I’ve just run a simple test on this and there should be no problem.

What Flink version were you using (the archetype version used with the Flink 
Quickstart Maven Archetype)?
Also, on which branch / commit was the Kinesis connector built? Seeing that 
you’ve used the “AUTO”
credentials provider option, I’m assuming it’s built on the master branch and 
not a release branch (the “AUTO”
option wasn’t included in any of the release branches yet).

So I’m suspecting it’s due to a version conflict between the two. If yes, you 
should build the Kinesis connector
with the same release version branch as the Flink version you’re using.
Could you check and see if the problem remains? Thanks!

Regards,
Gordon


On September 1, 2016 at 1:34:19 AM, Foster, Craig (foscr...@amazon.com) wrote:

Hi:

I am using the following WikiEdit example:

https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html

 

It works when printing the contents to a file or stdout.

 

But I wanted to modify it to use Kinesis instead of Kafka. So instead of the 
Kafka part, I put:

 

Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");

FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new 
SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");

result
.map(new MapFunction<Tuple2<String,Long>, String>() {
    @Override
    public String map(Tuple2<String, Long> tuple) {
        return tuple.toString();
    }
})
.addSink(kinesis);

see.execute();

 

 

But I get the following error:

2016-08-31 17:05:41,541 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom 
Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to 
CANCELING
2016-08-31 17:05:41,542 INFO  org.apache.flink.yarn.YarnJobManager              
            - Status of job 43a13707d92da260827f37968597c187 () changed to 
FAILING.
java.lang.Exception: Serialized representation of 
org.apache.flink.streaming.runtime.tasks.TimerException: 
java.lang.RuntimeException: Could not forward element to next operator
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Searching Google doesn't yield many things that seem to work. Is there 
somewhere I should look for a root cause? I looked in the full log file but 
it's not much more than this stacktrace.

Reply via email to