Hi Pedro, in order to add new sources you have to first stop the job (maybe taking a savepoint if you want to resume later on) and then restart the job with the changed topology.
Cheers, Till On Thu, Feb 16, 2017 at 4:06 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Good to know! > > > On February 16, 2017 at 10:13:28 PM, Pedro Monteiro ( > pedro.mlmonte...@gmail.com) wrote: > > Dear Gordon, > > Thanks for your help, I think I am on the right track as of now. > > On the other hand, I have another question: is it possible to add sources > to environments that are already executing? In what I am currently > developing, I need to add new sources as they arrive to my system. > > I will wait to hear from you! > > Cumprimentos, > > *Pedro Lima Monteiro* > > On 16 February 2017 at 11:29, Pedro Monteiro <pedro.mlmonte...@gmail.com> > wrote: > >> Thank you again for your prompt response. >> >> I will give it a try and will come back to you. >> >> *Pedro Lima Monteiro* >> >> On 16 February 2017 at 10:20, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >> wrote: >> >>> I would recommend checking out the Flink RabbitMQ Source for examples: >>> https://github.com/apache/flink/blob/master/flink-connectors >>> /flink-connector-rabbitmq/src/main/java/org/apache/flink/str >>> eaming/connectors/rabbitmq/RMQSource.java >>> >>> For your case, you should extend the `RichSourceFunction` which provides >>> additional access to override the `open()` life cycle method. >>> In that method, you instantiate your MongoDB client connection and >>> fetch the cursor. In the `run()` method, you should essentially have a >>> while loop that polls the MongoDB cursor and emits the fetched documents >>> using the `SourceContext`. >>> >>> If your also looking to implement a MongoDB source that works with >>> Flink’s checkpointing for exactly-once, be sure to check out: >>> https://ci.apache.org/projects/flink/flink-docs-release-1.2/ >>> dev/stream/state.html#stateful-source-functions >>> >>> Cheers, >>> Gordon >>> >>> On February 16, 2017 at 5:53:03 PM, Pedro Monteiro ( >>> pedro.mlmonte...@gmail.com) wrote: >>> >>> Dear Tzu-Li, >>> >>> Thank you so much for your prompt response. >>> >>> Lets assume I have a variable, in Java, env which is my >>> StreamExecutionEnvironment. When I go ahead and attempt to do: >>> >>>> env.addSource(); >>>> >>> >>> It requests an implementation of a Source Function interface: >>> >>> >>>> env.addSource(new SourceFunction<Document>() { >>> >>> >>>> >>>> @Override >>> >>> public void run(SourceFunction.SourceContext<Document> ctx) >>>> throws Exception { >>> >>> >>>> // TO DO >>>> >>> } >>> >>> >>>> @Override >>> >>> public void cancel() { >>> >>> >>>> // TO DO >>>> >>> } >>> >>> }); >>> >>> And this is where I'm somehow stuck. I do not understand how should I >>> access my MongoDB's cursor in any of this methods (I suppose the most >>> adequate would be the "run" method) in a way it would allow me to return a >>> new MongoDB document as it arrived to the database from another source. >>> >>> Once again, thank you so much for your help. >>> >>> I will wait to hear from you! >>> >>> Cumprimentos, >>> >>> *Pedro Lima Monteiro* >>> >>> On 16 February 2017 at 09:43, Tzu-Li (Gordon) Tai <tzuli...@apache.org> >>> wrote: >>> >>>> Hi Pedro! >>>> >>>> This is definitely possible, by simply writing a Flink `SourceFunction` >>>> that uses MongoDB clients to fetch the data. >>>> It should be straightforward and works well with MongoDB’s cursor APIs. >>>> >>>> Could you explain a bit which part in particular you were stuck with? >>>> >>>> Cheers, >>>> Gordon >>>> >>>> >>>> On February 16, 2017 at 5:31:39 PM, Pedro Monteiro ( >>>> pedro.mlmonte...@gmail.com) wrote: >>>> >>>> Good morning, >>>> >>>> I am trying to get data from MongoDB to be analysed in Flink. >>>> I would like to know if it is possible to stream data from MongoDB into >>>> Flink. I have looked into Flink's source function to add in the >>>> addSource >>>> method of the StreamExecutionEnvironment but I had no luck. >>>> Can anyone help me out? >>>> Thanks. >>>> >>>> *Pedro Lima Monteiro* >>>> >>>> >>> >> >