I would recommend checking out the Flink RabbitMQ Source for examples: https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
For your case, you should extend the `RichSourceFunction` which provides additional access to override the `open()` life cycle method. In that method, you instantiate your MongoDB client connection and fetch the cursor. In the `run()` method, you should essentially have a while loop that polls the MongoDB cursor and emits the fetched documents using the `SourceContext`. If your also looking to implement a MongoDB source that works with Flink’s checkpointing for exactly-once, be sure to check out: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#stateful-source-functions Cheers, Gordon On February 16, 2017 at 5:53:03 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) wrote: Dear Tzu-Li, Thank you so much for your prompt response. Lets assume I have a variable, in Java, env which is my StreamExecutionEnvironment. When I go ahead and attempt to do: env.addSource(); It requests an implementation of a Source Function interface: env.addSource(new SourceFunction<Document>() { @Override public void run(SourceFunction.SourceContext<Document> ctx) throws Exception { // TO DO } @Override public void cancel() { // TO DO } }); And this is where I'm somehow stuck. I do not understand how should I access my MongoDB's cursor in any of this methods (I suppose the most adequate would be the "run" method) in a way it would allow me to return a new MongoDB document as it arrived to the database from another source. Once again, thank you so much for your help. I will wait to hear from you! Cumprimentos, Pedro Lima Monteiro On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: Hi Pedro! This is definitely possible, by simply writing a Flink `SourceFunction` that uses MongoDB clients to fetch the data. It should be straightforward and works well with MongoDB’s cursor APIs. Could you explain a bit which part in particular you were stuck with? Cheers, Gordon On February 16, 2017 at 5:31:39 PM, Pedro Monteiro (pedro.mlmonte...@gmail.com) wrote: Good morning, I am trying to get data from MongoDB to be analysed in Flink. I would like to know if it is possible to stream data from MongoDB into Flink. I have looked into Flink's source function to add in the addSource method of the StreamExecutionEnvironment but I had no luck. Can anyone help me out? Thanks. Pedro Lima Monteiro