Thanks, that example is helpful. It seems like to use `fromCollection` with
an iterator it must be an iterator that implements serializable, and Java's
built in `Iterator`s don't, unfortunately.

On Thu, Apr 7, 2016 at 6:11 PM, Chesnay Schepler <ches...@apache.org> wrote:

> hmm, maybe i was to quick with linking to the JIRA.
>
> As for an example: you can look at the streaming WindowJoin example. The
> sample data uses an Iterator. (ThrottledIterator)
> Note that the iterator implementation used is part of flink and also
> implements serializable.
>
> On 07.04.2016 22:18, Andrew Whitaker wrote:
>
> Hi,
>
> I'm trying to get a simple example of a source backed by an iterator
> working. Here's the code I've got:
>
> ```
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> List<Integer> list = Arrays.asList(1, 2);
>
> env.fromCollection(list.iterator(), Integer.class).print();
> ```
>
> I get the following exception:
>
> ```
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: Object
> org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91
> not serializable
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
> at braintree.demo.FromIterator.main(FromIterator.java:14)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
> Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> at
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> at
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300)
> at
> org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
> ... 11 more
> ```
>
> This kind of makes sense. The root issue seems to be that the list's
> iterator is not serializable. In fact, java.util.Iterator doesn't implement
> Serializable.
>
> I can't seem to find any examples of `FromIteratorFunction` being used in
> the flink codebase. Am I using it wrong?
>
> Thanks!
>
> --
> Andrew Whitaker | andrew.whita...@braintreepayments.com
> --
> Note: this information is confidential. It is prohibited to share, post
> online or otherwise publicize without Braintree's prior written consent.
>
>
>


-- 
Andrew Whitaker | andrew.whita...@braintreepayments.com
--
Note: this information is confidential. It is prohibited to share, post
online or otherwise publicize without Braintree's prior written consent.

Reply via email to