Hi:
I am using the following WikiEdit example:
https://ci.apache.org/projects/flink/flink-docs-master/quickstart/run_example_quickstart.html
It works when printing the contents to a file or stdout.
But I wanted to modify it to use Kinesis instead of Kafka. So instead of the
Kafka part, I put:
Properties producerConfig = new Properties();
producerConfig.put(ProducerConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(ProducerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new
SimpleStringSchema(), producerConfig);
kinesis.setFailOnError(true);
kinesis.setDefaultStream("my-flink-stream");
kinesis.setDefaultPartition("0");
result
.map(new MapFunction<Tuple2<String,Long>, String>() {
@Override
public String map(Tuple2<String, Long> tuple) {
return tuple.toString();
}
})
.addSink(kinesis);
see.execute();
But I get the following error:
2016-08-31 17:05:41,541 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom
Source (1/1) (2f7d339588fec18e0f2617439ee9be6d) switched from RUNNING to
CANCELING
2016-08-31 17:05:41,542 INFO org.apache.flink.yarn.YarnJobManager
- Status of job 43a13707d92da260827f37968597c187 () changed to
FAILING.
java.lang.Exception: Serialized representation of
org.apache.flink.streaming.runtime.tasks.TimerException:
java.lang.RuntimeException: Could not forward element to next operator
at
org.apache.flink.streaming.runtime.tasks.StreamTask$TriggerTask.run(StreamTask.java:803)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Searching Google doesn't yield many things that seem to work. Is there
somewhere I should look for a root cause? I looked in the full log file but
it's not much more than this stacktrace.