Thank you both for your answers and yes, that does explain what's going
on.  I will have to refactor this code.

Thanks again for your help!
Rick

On Fri, May 29, 2020 at 2:29 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Roderick,
>
> adding to Robert's response: The easiest way is to get all needed
> information injected only in the driver from which you manually pass the
> config in a serializable form to your iterator. Configs could be for
> example a Java Map using serializable elements, such as Strings.
>
> If you need non-serializable objects, it is common practice to initialize
> them in Rich(ParallelSource)Function#open and pass all information needed
> to construct them in the constructor of the RichFunction and store them in
> serializable fields.
>
>
> On Fri, May 29, 2020 at 1:26 PM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Roderick,
>>
>> Luckily there are no silly questions, just silly answers (so I have the
>> harder job here ;) )
>>
>> It seems that you are trying to read data from an Arango Database, right?
>> What is important to understand is that the "flink job" that you are
>> implementing in your main() method gets executed in the JVM submitting your
>> job to the Flink cluster. There, we are just constructing a graph of
>> dataflow operations, which will then be distributed to the cluster.
>> As part of this process, we are serializing all the user code from the
>> client, and sending it over to the cluster where it gets executed in a
>> distributed fashion.
>> Further reading:
>> https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
>> I assume you are seeing "null" references because the objects you are
>> trying to send to the cluster are not serializable (but stored in a
>> transient field); or Spring is doing some Dependency Injection magic that
>> does not work in the remote Flink environment.
>>
>> tl;dr: What I would recommend is implement a custom SourceFunction that
>> reads from ArangoDB. The RichParallelSourceFunction will allow you to read
>> with parallelism > 1, and it has some lifecycle methods for opening and
>> closing the connection to Arango.
>> For the configuration passing, I would pass it a _serializable_ object
>> through the constructor of your custom source.
>>
>> Best,
>> Robert
>>
>>
>>
>> On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <vincent.r...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am brand new to Apache Flink so please excuse any silly questions.  I
>>> have an Iterator function defined as below and adding it as a source to a
>>> Flink stream.  But when I try to pass configuration information to it (via
>>> a Spring env), what I notice is that one of the threads calls hasNext() and
>>> it is not the same object and the passed information is null.  Something is
>>> constructing it, but what is strange is that if I add a default constructor
>>> I do not see this being called by this thread with the null data so I am
>>> wondering what is going on.  Any ideas?  How do we pass configuration
>>> information to these functions?  Any help would be appreciated.
>>>
>>> Thanks,
>>> Rick
>>>
>>> @Public
>>> public class NodeSource extends
>>> FromIteratorFunction<LinkedList<BaseDocument>> {
>>>
>>>
>>> private static final long serialVersionUID = 1L;
>>>
>>> public NodeSource(ArangoDBSource iterator) {
>>> super(iterator);
>>> }
>>>
>>> }
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Reply via email to