Dear Gordon,

Thanks for your help, I think I am on the right track as of now.

On the other hand, I have another question: is it possible to add sources
to environments that are already executing? In what I am currently
developing, I need to add new sources as they arrive to my system.

I will wait to hear from you!

Cumprimentos,

*Pedro Lima Monteiro*

On 16 February 2017 at 11:29, Pedro Monteiro <pedro.mlmonte...@gmail.com>
wrote:

> Thank you again for your prompt response.
>
> I will give it a try and will come back to you.
>
> *Pedro Lima Monteiro*
>
> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> I would recommend checking out the Flink RabbitMQ Source for examples:
>> https://github.com/apache/flink/blob/master/flink-connectors
>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/
>> streaming/connectors/rabbitmq/RMQSource.java
>>
>> For your case, you should extend the `RichSourceFunction` which provides
>> additional access to override the `open()` life cycle method.
>> In that method, you instantiate your MongoDB client connection and  fetch
>> the cursor. In the `run()` method, you should essentially have a while loop
>> that polls the MongoDB cursor and emits the fetched documents using the
>> `SourceContext`.
>>
>> If your also looking to implement a MongoDB source that works with
>> Flink’s checkpointing for exactly-once, be sure to check out:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/
>> dev/stream/state.html#stateful-source-functions
>>
>> Cheers,
>> Gordon
>>
>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (
>> pedro.mlmonte...@gmail.com) wrote:
>>
>> Dear Tzu-Li,
>>
>> Thank you so much for your prompt response.
>>
>> Lets assume I have a variable, in Java, env which is my
>> StreamExecutionEnvironment. When I go ahead and attempt to do:
>>
>>> ​env.addSource();
>>>
>>
>> ​It requests an implementation of a Source Function interface:
>> ​
>>
>>> env.addSource(new SourceFunction<Document>() {
>>
>>
>>> ​​
>>> @Override
>>
>>             public void run(SourceFunction.SourceContext<Document> ctx)
>>> throws Exception {
>>
>>
>>> ​// TO DO​
>>>
>>             }
>>
>>
>>>             @Override
>>
>>             public void cancel() {
>>
>>
>>> ​// TO DO​
>>>
>>             }
>>
>>         });
>>
>> ​And this is where I'm somehow stuck. I do not understand how should I
>> access my MongoDB's cursor in any of this methods (I suppose the most
>> adequate would be the "run" method) in a way it would allow me to return a
>> new MongoDB document as it arrived to the database from another source.
>>
>> Once again, thank you so much for your help.
>>
>> I will wait to hear from you!​
>>
>> Cumprimentos,
>>
>> *Pedro Lima Monteiro*
>>
>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
>> wrote:
>>
>>> Hi Pedro!
>>>
>>> This is definitely possible, by simply writing a Flink `SourceFunction`
>>> that uses MongoDB clients to fetch the data.
>>> It should be straightforward and works well with MongoDB’s cursor APIs.
>>>
>>> Could you explain a bit which part in particular you were stuck with?
>>>
>>> Cheers,
>>> Gordon
>>>
>>>
>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (
>>> pedro.mlmonte...@gmail.com) wrote:
>>>
>>> Good morning,
>>>
>>> I am trying to get data from MongoDB to be analysed in Flink.
>>> I would like to know if it is possible to stream data from MongoDB into
>>> Flink. I have looked into Flink's source function to add in the addSource
>>> method of the StreamExecutionEnvironment but I had no luck.
>>> Can anyone help me out?
>>> Thanks.
>>>
>>> *Pedro Lima Monteiro*
>>>
>>>
>>
>

Reply via email to