Hi Roderick,

Luckily there are no silly questions, just silly answers (so I have the
harder job here ;) )

It seems that you are trying to read data from an Arango Database, right?
What is important to understand is that the "flink job" that you are
implementing in your main() method gets executed in the JVM submitting your
job to the Flink cluster. There, we are just constructing a graph of
dataflow operations, which will then be distributed to the cluster.
As part of this process, we are serializing all the user code from the
client, and sending it over to the cluster where it gets executed in a
distributed fashion.
Further reading:
https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
I assume you are seeing "null" references because the objects you are
trying to send to the cluster are not serializable (but stored in a
transient field); or Spring is doing some Dependency Injection magic that
does not work in the remote Flink environment.

tl;dr: What I would recommend is implement a custom SourceFunction that
reads from ArangoDB. The RichParallelSourceFunction will allow you to read
with parallelism > 1, and it has some lifecycle methods for opening and
closing the connection to Arango.
For the configuration passing, I would pass it a _serializable_ object
through the constructor of your custom source.

Best,
Robert



On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <vincent.r...@gmail.com>
wrote:

> Hi,
>
> I am brand new to Apache Flink so please excuse any silly questions.  I
> have an Iterator function defined as below and adding it as a source to a
> Flink stream.  But when I try to pass configuration information to it (via
> a Spring env), what I notice is that one of the threads calls hasNext() and
> it is not the same object and the passed information is null.  Something is
> constructing it, but what is strange is that if I add a default constructor
> I do not see this being called by this thread with the null data so I am
> wondering what is going on.  Any ideas?  How do we pass configuration
> information to these functions?  Any help would be appreciated.
>
> Thanks,
> Rick
>
> @Public
> public class NodeSource extends
> FromIteratorFunction<LinkedList<BaseDocument>> {
>
>
> private static final long serialVersionUID = 1L;
>
> public NodeSource(ArangoDBSource iterator) {
> super(iterator);
> }
>
> }
>

Reply via email to