hmm, maybe i was to quick with linking to the JIRA.
As for an example: you can look at the streaming WindowJoin example. The
sample data uses an Iterator. (ThrottledIterator)
Note that the iterator implementation used is part of flink and also
implements serializable.
On 07.04.2016 22:18, Andrew Whitaker wrote:
Hi,
I'm trying to get a simple example of a source backed by an iterator
working. Here's the code I've got:
```
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
List<Integer> list = Arrays.asList(1, 2);
env.fromCollection(list.iterator(), Integer.class).print();
```
I get the following exception:
```
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: Object
org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
not serializable
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
at
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```
This kind of makes sense. The root issue seems to be that the list's
iterator is not serializable. In fact, java.util.Iterator doesn't
implement Serializable.
I can't seem to find any examples of `FromIteratorFunction` being used
in the flink codebase. Am I using it wrong?
Thanks!
--
Andrew Whitaker | andrew.whita...@braintreepayments.com
<mailto:andrew.whita...@braintreepayments.com>
--
Note: this information is confidential. It is prohibited to share,
post online or otherwise publicize without Braintree's prior written
consent.