Hi Roderick,

adding to Robert's response: The easiest way is to get all needed
information injected only in the driver from which you manually pass the
config in a serializable form to your iterator. Configs could be for
example a Java Map using serializable elements, such as Strings.

If you need non-serializable objects, it is common practice to initialize
them in Rich(ParallelSource)Function#open and pass all information needed
to construct them in the constructor of the RichFunction and store them in
serializable fields.


On Fri, May 29, 2020 at 1:26 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hi Roderick,
>
> Luckily there are no silly questions, just silly answers (so I have the
> harder job here ;) )
>
> It seems that you are trying to read data from an Arango Database, right?
> What is important to understand is that the "flink job" that you are
> implementing in your main() method gets executed in the JVM submitting your
> job to the Flink cluster. There, we are just constructing a graph of
> dataflow operations, which will then be distributed to the cluster.
> As part of this process, we are serializing all the user code from the
> client, and sending it over to the cluster where it gets executed in a
> distributed fashion.
> Further reading:
> https://ci.apache.org/projects/flink/flink-docs-master/concepts/flink-architecture.html
> I assume you are seeing "null" references because the objects you are
> trying to send to the cluster are not serializable (but stored in a
> transient field); or Spring is doing some Dependency Injection magic that
> does not work in the remote Flink environment.
>
> tl;dr: What I would recommend is implement a custom SourceFunction that
> reads from ArangoDB. The RichParallelSourceFunction will allow you to read
> with parallelism > 1, and it has some lifecycle methods for opening and
> closing the connection to Arango.
> For the configuration passing, I would pass it a _serializable_ object
> through the constructor of your custom source.
>
> Best,
> Robert
>
>
>
> On Thu, May 28, 2020 at 6:40 PM Roderick Vincent <vincent.r...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am brand new to Apache Flink so please excuse any silly questions.  I
>> have an Iterator function defined as below and adding it as a source to a
>> Flink stream.  But when I try to pass configuration information to it (via
>> a Spring env), what I notice is that one of the threads calls hasNext() and
>> it is not the same object and the passed information is null.  Something is
>> constructing it, but what is strange is that if I add a default constructor
>> I do not see this being called by this thread with the null data so I am
>> wondering what is going on.  Any ideas?  How do we pass configuration
>> information to these functions?  Any help would be appreciated.
>>
>> Thanks,
>> Rick
>>
>> @Public
>> public class NodeSource extends
>> FromIteratorFunction<LinkedList<BaseDocument>> {
>>
>>
>> private static final long serialVersionUID = 1L;
>>
>> public NodeSource(ArangoDBSource iterator) {
>> super(iterator);
>> }
>>
>> }
>>
>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Reply via email to