Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

2018-09-11 Thread vino yang
Hi Marke, Should not use the code like this : *delivery.getProperties().getUserId();* to get the userId from Delivery object? And for second code example, Since you got the object of TimeSeriesType type, should not define *DataStream* instead of *DataStream*. Regarding userId, I just said that

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

2018-09-09 Thread vino yang
Hi Marke, As soon as I didn't really implement this code, but I think you can replace this line of code: *OUT result = schema.deserialize(delivery.getBody()); //RMQSource#run* instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, an

Re: Flink RMQSource Consumer: How I get the RabbitMQ UserId

2018-09-08 Thread vino yang
Hi Marke, As you said, you need to extend RMQSource because Flink's rabbitmq connector only extracts the body of Delivery. Therefore, in order to achieve your purpose, you need to add a property to the specific data type of your DataStream to represent the userId, then override the RMQSource#run m

Flink RMQSource Consumer: How I get the RabbitMQ UserId

2018-09-08 Thread Marke Builder
Hi, how I can get the UserId from the Properties in my DataStream? I can read the userId if I extend the RMQSource Class: QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String userId = delivery.getProperties().getUserId(); But how can I provide this to my DataStream ? Best regard