I tried getting this in logs..

2018-03-15 20:59:38,154 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask
          - No state backend has been configured, using default state
backend (Memory / JobManager)

2018-03-15 20:59:38,296 INFO
org.apache.flink.streaming.api.functions.source.MessageAcknowledgingSourceBase
- No state to restore for the AMQSource.

2018-03-15 20:59:39,488 WARN
org.apache.flink.streaming.connectors.activemq.AMQSource      - Active MQ
source received non bytes message: null


On Thu, Mar 15, 2018 at 9:00 PM, Puneet Kinra <
puneet.ki...@customercentria.com> wrote:

> I tried in cluster as well .
>
> On Wed, Mar 14, 2018 at 10:01 PM, Timo Walther <twal...@apache.org> wrote:
>
>> Hi Puneet,
>>
>> are you running this job on the cluster or locally in your IDE?
>>
>> Regards,
>> Timo
>>
>>
>> Am 14.03.18 um 13:49 schrieb Puneet Kinra:
>>
>> Hi
>>
>> I used apache bahir connector  below is the code.the job is getting
>> finished
>> and not generated the output as well ,ideal it should keep on running
>> below the code.
>>
>>
>> import org.apache.activemq.ActiveMQConnectionFactory;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import org.apache.flink.streaming.api.environment.StreamExecutionEn
>> vironment;
>> import org.apache.flink.streaming.connectors.activemq.AMQSource;
>> import org.apache.flink.streaming.connectors.activemq.AMQSourceConfig;
>> import org.apache.flink.streaming.connectors.activemq.DestinationType;
>> import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
>>
>> /**
>>  * @author puneet
>>  *
>>  */
>> public class TestAMQ {
>>
>>
>> public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env = StreamExecutionEnvironment.get
>> ExecutionEnvironment();
>> AMQSourceConfig<String> sourceConfig = new AMQSourceConfig.AMQSourceConfi
>> gBuilder<String>()
>> .setConnectionFactory(new ActiveMQConnectionFactory("tcp
>> ://localhost:61616"))
>> .setDestinationName("test")
>> .setDeserializationSchema(new SimpleStringSchema())
>> .setDestinationType(DestinationType.QUEUE)
>> .build();
>> DataStream < String > messageStream = env.addSource(new
>> AMQSource<String>(sourceConfig));
>> messageStream.print();
>> env.execute();
>> }
>>
>> }
>>
>>
>> --
>> *Cheers *
>>
>> *Puneet Kinra*
>>
>> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>> *e-mail :puneet.ki...@customercentria.com
>> <puneet.ki...@customercentria.com>*
>>
>>
>>
>>
>
>
> --
> *Cheers *
>
> *Puneet Kinra*
>
> *Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
> *e-mail :puneet.ki...@customercentria.com
> <puneet.ki...@customercentria.com>*
>
>
>


-- 
*Cheers *

*Puneet Kinra*

*Mobile:+918800167808 | Skype : puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

*e-mail :puneet.ki...@customercentria.com
<puneet.ki...@customercentria.com>*

Reply via email to