Thanks, that example is helpful. It seems like to use `fromCollection` with an iterator it must be an iterator that implements serializable, and Java's built in `Iterator`s don't, unfortunately.
On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler <ches...@apache.org> wrote: > hmm, maybe i was to quick with linking to the JIRA. > > As for an example: you can look at the streaming WindowJoin example. The > sample data uses an Iterator. (ThrottledIterator) > Note that the iterator implementation used is part of flink and also > implements serializable. > > On 07.04.2016 22:18, Andrew Whitaker wrote: > > Hi, > > I'm trying to get a simple example of a source backed by an iterator > working. Here's the code I've got: > > ``` > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > List<Integer> list = Arrays.asList(1, 2); > > env.fromCollection(list.iterator(), Integer.class).print(); > ``` > > I get the following exception: > > ``` > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: Object > org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 > not serializable > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99) > at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766) > at braintree.demo.FromIterator.main(FromIterator.java:14) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) > Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) > at > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) > at > org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97) > ... 11 more > ``` > > This kind of makes sense. The root issue seems to be that the list's > iterator is not serializable. In fact, java.util.Iterator doesn't implement > Serializable. > > I can't seem to find any examples of `FromIteratorFunction` being used in > the flink codebase. Am I using it wrong? > > Thanks! > > -- > Andrew Whitaker | andrew.whita...@braintreepayments.com > -- > Note: this information is confidential. It is prohibited to share, post > online or otherwise publicize without Braintree's prior written consent. > > > -- Andrew Whitaker | andrew.whita...@braintreepayments.com -- Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.