I tried getting this in logs..
2018-03-15 20:59:38,154 INFO org.apache.flink.streaming.runtime.tasks.StreamTask - No state backend has been configured, using default state backend (Memory / JobManager) 2018-03-15 20:59:38,296 INFO org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase - No state to restore for the AMQSource. 2018-03-15 20:59:39,488 WARN org.apache.flink.streaming.connectors.activemq.AMQSource - Active MQ source received non bytes message: null On Thu, Mar 15, 2018 at 9:00 PM, Puneet Kinra < puneet.ki...@customercentria.com> wrote: > I tried in cluster as well . > > On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <twal...@apache.org> wrote: > >> Hi Puneet, >> >> are you running this job on the cluster or locally in your IDE? >> >> Regards, >> Timo >> >> >> Am 14.03.18 um 13:49 schrieb Puneet Kinra: >> >> Hi >> >> I used apache bahir connector below is the code.the job is getting >> finished >> and not generated the output as well ,ideal it should keep on running >> below the code. >> >> >> import org.apache.activemq.ActiveMQConnectionFactory; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEn >> vironment; >> import org.apache.flink.streaming.connectors.activemq.AMQSource; >> import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig; >> import org.apache.flink.streaming.connectors.activemq.DestinationType; >> import org.apache.flink.streaming.util.serialization.SimpleStringSchema; >> >> /** >> * @author puneet >> * >> */ >> public class TestAMQ { >> >> >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = StreamExecutionEnvironment.get >> ExecutionEnvironment(); >> AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfi >> gBuilder<String>() >> .setConnectionFactory(new ActiveMQConnectionFactory("tcp >> ://localhost:61616")) >> .setDestinationName("test") >> .setDeserializationSchema(new SimpleStringSchema()) >> .setDestinationType(DestinationType.QUEUE) >> .build(); >> DataStream < String > messageStream = env.addSource(new >> AMQSource<String>(sourceConfig)); >> messageStream.print(); >> env.execute(); >> } >> >> } >> >> >> -- >> *Cheers * >> >> *Puneet Kinra* >> >> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> *e-mail :puneet.ki...@customercentria.com >> <puneet.ki...@customercentria.com>* >> >> >> >> > > > -- > *Cheers * > > *Puneet Kinra* > > *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > *e-mail :puneet.ki...@customercentria.com > <puneet.ki...@customercentria.com>* > > > -- *Cheers * *Puneet Kinra* *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>* *e-mail :puneet.ki...@customercentria.com <puneet.ki...@customercentria.com>*