hmm, maybe i was to quick with linking to the JIRA.

As for an example: you can look at the streaming WindowJoin example. The sample data uses an Iterator. (ThrottledIterator) Note that the iterator implementation used is part of flink and also implements serializable.

On 07.04.2016 22:18, Andrew Whitaker wrote:
Hi,

I'm trying to get a simple example of a source backed by an iterator working. Here's the code I've got:

```
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

List<Integer> list = Arrays.asList(1, 2);

env.fromCollection(list.iterator(), Integer.class).print();
```

I get the following exception:

```
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object org.apache.flink.streaming.api.functions.source.FromIteratorFunction@25618e91 not serializable at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:793) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.fromCollection(StreamExecutionEnvironment.java:766)
at braintree.demo.FromIterator.main(FromIterator.java:14)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: java.io.NotSerializableException: java.util.AbstractList$Itr
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:300) at org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:97)
... 11 more
```

This kind of makes sense. The root issue seems to be that the list's iterator is not serializable. In fact, java.util.Iterator doesn't implement Serializable.

I can't seem to find any examples of `FromIteratorFunction` being used in the flink codebase. Am I using it wrong?

Thanks!

--
Andrew Whitaker | andrew.whita...@braintreepayments.com <mailto:andrew.whita...@braintreepayments.com>
--
Note: this information is confidential. It is prohibited to share, post online or otherwise publicize without Braintree's prior written consent.

Reply via email to