Dear Gordon, Thanks for your help, I think I am on the right track as of now.
On the other hand, I have another question: is it possible to add sources to environments that are already executing? In what I am currently developing, I need to add new sources as they arrive to my system. I will wait to hear from you! Cumprimentos, *Pedro Lima Monteiro* On 16 February 2017 at 11:29, Pedro Monteiro <pedro.mlmonte...@gmail.com> wrote: > Thank you again for your prompt response. > > I will give it a try and will come back to you. > > *Pedro Lima Monteiro* > > On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> I would recommend checking out the Flink RabbitMQ Source for examples: >> https://github.com/apache/flink/blob/master/flink-connectors >> /flink-connector-rabbitmq/src/main/java/org/apache/flink/ >> streaming/connectors/rabbitmq/RMQSource.java >> >> For your case, you should extend the `RichSourceFunction` which provides >> additional access to override the `open()` life cycle method. >> In that method, you instantiate your MongoDB client connection and fetch >> the cursor. In the `run()` method, you should essentially have a while loop >> that polls the MongoDB cursor and emits the fetched documents using the >> `SourceContext`. >> >> If your also looking to implement a MongoDB source that works with >> Flink’s checkpointing for exactly-once, be sure to check out: >> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >> dev/stream/state.html#stateful-source-functions >> >> Cheers, >> Gordon >> >> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ( >> pedro.mlmonte...@gmail.com) wrote: >> >> Dear Tzu-Li, >> >> Thank you so much for your prompt response. >> >> Lets assume I have a variable, in Java, env which is my >> StreamExecutionEnvironment. When I go ahead and attempt to do: >> >>> env.addSource(); >>> >> >> It requests an implementation of a Source Function interface: >> >> >>> env.addSource(new SourceFunction<Document>() { >> >> >>> >>> @Override >> >> public void run(SourceFunction.SourceContext<Document> ctx) >>> throws Exception { >> >> >>> // TO DO >>> >> } >> >> >>> @Override >> >> public void cancel() { >> >> >>> // TO DO >>> >> } >> >> }); >> >> And this is where I'm somehow stuck. I do not understand how should I >> access my MongoDB's cursor in any of this methods (I suppose the most >> adequate would be the "run" method) in a way it would allow me to return a >> new MongoDB document as it arrived to the database from another source. >> >> Once again, thank you so much for your help. >> >> I will wait to hear from you! >> >> Cumprimentos, >> >> *Pedro Lima Monteiro* >> >> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> Hi Pedro! >>> >>> This is definitely possible, by simply writing a Flink `SourceFunction` >>> that uses MongoDB clients to fetch the data. >>> It should be straightforward and works well with MongoDB’s cursor APIs. >>> >>> Could you explain a bit which part in particular you were stuck with? >>> >>> Cheers, >>> Gordon >>> >>> >>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ( >>> pedro.mlmonte...@gmail.com) wrote: >>> >>> Good morning, >>> >>> I am trying to get data from MongoDB to be analysed in Flink. >>> I would like to know if it is possible to stream data from MongoDB into >>> Flink. I have looked into Flink's source function to add in the addSource >>> method of the StreamExecutionEnvironment but I had no luck. >>> Can anyone help me out? >>> Thanks. >>> >>> *Pedro Lima Monteiro* >>> >>> >> >