Thank you again for your prompt response. I will give it a try and will come back to you.
*Pedro Lima Monteiro* On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > I would recommend checking out the Flink RabbitMQ Source for examples: > https://github.com/apache/flink/blob/master/flink- > connectors/flink-connector-rabbitmq/src/main/java/org/ > apache/flink/streaming/connectors/rabbitmq/RMQSource.java > > For your case, you should extend the `RichSourceFunction` which provides > additional access to override the `open()` life cycle method. > In that method, you instantiate your MongoDB client connection and fetch > the cursor. In the `run()` method, you should essentially have a while loop > that polls the MongoDB cursor and emits the fetched documents using the > `SourceContext`. > > If your also looking to implement a MongoDB source that works with Flink’s > checkpointing for exactly-once, be sure to check out: > https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/stream/state.html#stateful-source-functions > > Cheers, > Gordon > > On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ( > pedro.mlmonte...@gmail.com) wrote: > > Dear Tzu-Li, > > Thank you so much for your prompt response. > > Lets assume I have a variable, in Java, env which is my > StreamExecutionEnvironment. When I go ahead and attempt to do: > >> env.addSource(); >> > > It requests an implementation of a Source Function interface: > > >> env.addSource(new SourceFunction<Document>() { > > >> >> @Override > > public void run(SourceFunction.SourceContext<Document> ctx) >> throws Exception { > > >> // TO DO >> > } > > >> @Override > > public void cancel() { > > >> // TO DO >> > } > > }); > > And this is where I'm somehow stuck. I do not understand how should I > access my MongoDB's cursor in any of this methods (I suppose the most > adequate would be the "run" method) in a way it would allow me to return a > new MongoDB document as it arrived to the database from another source. > > Once again, thank you so much for your help. > > I will wait to hear from you! > > Cumprimentos, > > *Pedro Lima Monteiro* > > On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Pedro! >> >> This is definitely possible, by simply writing a Flink `SourceFunction` >> that uses MongoDB clients to fetch the data. >> It should be straightforward and works well with MongoDB’s cursor APIs. >> >> Could you explain a bit which part in particular you were stuck with? >> >> Cheers, >> Gordon >> >> >> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ( >> pedro.mlmonte...@gmail.com) wrote: >> >> Good morning, >> >> I am trying to get data from MongoDB to be analysed in Flink. >> I would like to know if it is possible to stream data from MongoDB into >> Flink. I have looked into Flink's source function to add in the addSource >> method of the StreamExecutionEnvironment but I had no luck. >> Can anyone help me out? >> Thanks. >> >> *Pedro Lima Monteiro* >> >> >