Hi Roderick, adding to Robert's response: The easiest way is to get all needed information injected only in the driver from which you manually pass the config in a serializable form to your iterator. Configs could be for example a Java Map using serializable elements, such as Strings.
If you need non-serializable objects, it is common practice to initialize them in Rich(ParallelSource)Function#open and pass all information needed to construct them in the constructor of the RichFunction and store them in serializable fields. On Fri, May 29, 2020 at 1:26 PM Robert Metzger <rmetz...@apache.org> wrote: > Hi Roderick, > > Luckily there are no silly questions, just silly answers (so I have the > harder job here ;) ) > > It seems that you are trying to read data from an Arango Database, right? > What is important to understand is that the "flink job" that you are > implementing in your main() method gets executed in the JVM submitting your > job to the Flink cluster. There, we are just constructing a graph of > dataflow operations, which will then be distributed to the cluster. > As part of this process, we are serializing all the user code from the > client, and sending it over to the cluster where it gets executed in a > distributed fashion. > Further reading: > https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html > I assume you are seeing "null" references because the objects you are > trying to send to the cluster are not serializable (but stored in a > transient field); or Spring is doing some Dependency Injection magic that > does not work in the remote Flink environment. > > tl;dr: What I would recommend is implement a custom SourceFunction that > reads from ArangoDB. The RichParallelSourceFunction will allow you to read > with parallelism > 1, and it has some lifecycle methods for opening and > closing the connection to Arango. > For the configuration passing, I would pass it a _serializable_ object > through the constructor of your custom source. > > Best, > Robert > > > > On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <vincent.r...@gmail.com> > wrote: > >> Hi, >> >> I am brand new to Apache Flink so please excuse any silly questions. I >> have an Iterator function defined as below and adding it as a source to a >> Flink stream. But when I try to pass configuration information to it (via >> a Spring env), what I notice is that one of the threads calls hasNext() and >> it is not the same object and the passed information is null. Something is >> constructing it, but what is strange is that if I add a default constructor >> I do not see this being called by this thread with the null data so I am >> wondering what is going on. Any ideas? How do we pass configuration >> information to these functions? Any help would be appreciated. >> >> Thanks, >> Rick >> >> @Public >> public class NodeSource extends >> FromIteratorFunction<LinkedList<BaseDocument>> { >> >> >> private static final long serialVersionUID = 1L; >> >> public NodeSource(ArangoDBSource iterator) { >> super(iterator); >> } >> >> } >> > -- Arvid Heise | Senior Java Developer <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Ververica GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng