Hi Pedro,

in order to add new sources you have to first stop the job (maybe taking a
savepoint if you want to resume later on) and then restart the job with the
changed topology.

Cheers,
Till

On Thu, Feb 16, 2017 at 4:06 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Good to know!
>
>
> On February 16, 2017 at 10:13:28 PM, Pedro Monteiro (
> pedro.mlmonte...@gmail.com) wrote:
>
> Dear Gordon,
>
> Thanks for your help, I think I am on the right track as of now.
>
> On the other hand, I have another question: is it possible to add sources
> to environments that are already executing? In what I am currently
> developing, I need to add new sources as they arrive to my system.
>
> I will wait to hear from you!
>
> Cumprimentos,
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 11:29, Pedro Monteiro <pedro.mlmonte...@gmail.com>
> wrote:
>
>> Thank you again for your prompt response.
>>
>> I will give it a try and will come back to you.
>>
>> *Pedro Lima Monteiro*
>>
>> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> I would recommend checking out the Flink RabbitMQ Source for examples:
>>> https://github.com/apache/flink/blob/master/flink-connectors
>>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/str
>>> eaming/connectors/rabbitmq/RMQSource.java
>>>
>>> For your case, you should extend the `RichSourceFunction` which provides
>>> additional access to override the `open()` life cycle method.
>>> In that method, you instantiate your MongoDB client connection and
>>>  fetch the cursor. In the `run()` method, you should essentially have a
>>> while loop that polls the MongoDB cursor and emits the fetched documents
>>> using the `SourceContext`.
>>>
>>> If your also looking to implement a MongoDB source that works with
>>> Flink’s checkpointing for exactly-once, be sure to check out:
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>>> dev/stream/state.html#stateful-source-functions
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
>>> pedro.mlmonte...@gmail.com) wrote:
>>>
>>> Dear Tzu-Li,
>>>
>>> Thank you so much for your prompt response.
>>>
>>> Lets assume I have a variable, in Java, env which is my
>>> StreamExecutionEnvironment. When I go ahead and attempt to do:
>>>
>>>> ​env.addSource();
>>>>
>>>
>>> ​It requests an implementation of a Source Function interface:
>>> ​
>>>
>>>> env.addSource(new SourceFunction<Document>() {
>>>
>>>
>>>> ​​
>>>> @Override
>>>
>>>             public void run(SourceFunction.SourceContext<Document> ctx)
>>>> throws Exception {
>>>
>>>
>>>> ​// TO DO​
>>>>
>>>             }
>>>
>>>
>>>>             @Override
>>>
>>>             public void cancel() {
>>>
>>>
>>>> ​// TO DO​
>>>>
>>>             }
>>>
>>>         });
>>>
>>> ​And this is where I'm somehow stuck. I do not understand how should I
>>> access my MongoDB's cursor in any of this methods (I suppose the most
>>> adequate would be the "run" method) in a way it would allow me to return a
>>> new MongoDB document as it arrived to the database from another source.
>>>
>>> Once again, thank you so much for your help.
>>>
>>> I will wait to hear from you!​
>>>
>>> Cumprimentos,
>>>
>>> *Pedro Lima Monteiro*
>>>
>>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>>> wrote:
>>>
>>>> Hi Pedro!
>>>>
>>>> This is definitely possible, by simply writing a Flink `SourceFunction`
>>>> that uses MongoDB clients to fetch the data.
>>>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>>>
>>>> Could you explain a bit which part in particular you were stuck with?
>>>>
>>>> Cheers,
>>>> Gordon
>>>>
>>>>
>>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>>>> pedro.mlmonte...@gmail.com) wrote:
>>>>
>>>> Good morning,
>>>>
>>>> I am trying to get data from MongoDB to be analysed in Flink.
>>>> I would like to know if it is possible to stream data from MongoDB into
>>>> Flink. I have looked into Flink's source function to add in the
>>>> addSource
>>>> method of the StreamExecutionEnvironment but I had no luck.
>>>> Can anyone help me out?
>>>> Thanks.
>>>>
>>>> *Pedro Lima Monteiro*
>>>>
>>>>
>>>
>>
>

Reply via email to