Thank you both for your answers and yes, that does explain what's going on. I will have to refactor this code.
Thanks again for your help! Rick On Fri, May 29, 2020 at 2:29 PM Arvid Heise <ar...@ververica.com> wrote: > Hi Roderick, > > adding to Robert's response: The easiest way is to get all needed > information injected only in the driver from which you manually pass the > config in a serializable form to your iterator. Configs could be for > example a Java Map using serializable elements, such as Strings. > > If you need non-serializable objects, it is common practice to initialize > them in Rich(ParallelSource)Function#open and pass all information needed > to construct them in the constructor of the RichFunction and store them in > serializable fields. > > > On Fri, May 29, 2020 at 1:26 PM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Roderick, >> >> Luckily there are no silly questions, just silly answers (so I have the >> harder job here ;) ) >> >> It seems that you are trying to read data from an Arango Database, right? >> What is important to understand is that the "flink job" that you are >> implementing in your main() method gets executed in the JVM submitting your >> job to the Flink cluster. There, we are just constructing a graph of >> dataflow operations, which will then be distributed to the cluster. >> As part of this process, we are serializing all the user code from the >> client, and sending it over to the cluster where it gets executed in a >> distributed fashion. >> Further reading: >> https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html >> I assume you are seeing "null" references because the objects you are >> trying to send to the cluster are not serializable (but stored in a >> transient field); or Spring is doing some Dependency Injection magic that >> does not work in the remote Flink environment. >> >> tl;dr: What I would recommend is implement a custom SourceFunction that >> reads from ArangoDB. The RichParallelSourceFunction will allow you to read >> with parallelism > 1, and it has some lifecycle methods for opening and >> closing the connection to Arango. >> For the configuration passing, I would pass it a _serializable_ object >> through the constructor of your custom source. >> >> Best, >> Robert >> >> >> >> On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <vincent.r...@gmail.com> >> wrote: >> >>> Hi, >>> >>> I am brand new to Apache Flink so please excuse any silly questions. I >>> have an Iterator function defined as below and adding it as a source to a >>> Flink stream. But when I try to pass configuration information to it (via >>> a Spring env), what I notice is that one of the threads calls hasNext() and >>> it is not the same object and the passed information is null. Something is >>> constructing it, but what is strange is that if I add a default constructor >>> I do not see this being called by this thread with the null data so I am >>> wondering what is going on. Any ideas? How do we pass configuration >>> information to these functions? Any help would be appreciated. >>> >>> Thanks, >>> Rick >>> >>> @Public >>> public class NodeSource extends >>> FromIteratorFunction<LinkedList<BaseDocument>> { >>> >>> >>> private static final long serialVersionUID = 1L; >>> >>> public NodeSource(ArangoDBSource iterator) { >>> super(iterator); >>> } >>> >>> } >>> >> > > -- > > Arvid Heise | Senior Java Developer > > <https://www.ververica.com/> > > Follow us @VervericaData > > -- > > Join Flink Forward <https://flink-forward.org/> - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Ververica GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji > (Toni) Cheng >