Hi Marke, Should not use the code like this :
*delivery.getProperties().getUserId();* to get the userId from Delivery object? And for second code example, Since you got the object of TimeSeriesType type, should not define *DataStream<TimeSeriesType>* instead of *DataStream<String>*. Regarding userId, I just said that this is a way of extracting. But if it doesn't have a value in itself, then there is no way to get it. Can you confirm that the message itself has value in RabbitMQ? Thanks, vino. Marke Builder <marke.buil...@gmail.com> 于2018年9月11日周二 下午4:34写道: > Hi Vino, > > this is what I done, but no user Id available. And the first question was > about the running parameter in RMQSource#boolean running. > > Code example: > @Override > run(SourceContext cts) { > .... > TimeSeriesType result = (TimeSeriesType) > schema.deserialize(delivery.getBody()); > ..... > final String userId = delivery.getProperties(). > result.setDeviceId(userId); > ...... > ctx.collect(result); > > > And the DataStream looks like this: > final DataStream<String> stream = env > .addSource(new RabbitmqStreamProcessorV2( > connectionConfig, > fastDataQueue, > new > AbstractDeserializationSchema<TimeSeriesType>() { > @Override > public TimeSeriesType deserialize(byte[] > bytes) throws IOException { > TimeSeriesType message = null; > try { > message = xmlParser.parse(new > String(bytes, "UTF8")); > logger.info("Data/Message size: " > +String.valueOf(message.getData().size())); > } catch (JAXBException e) { > e.printStackTrace(); > logger.log(Level.INFO, e.toString()); > } > return message; > } > })) > .flatMap(..... > > > > > > > Am Mo., 10. Sep. 2018 um 03:52 Uhr schrieb vino yang < > yanghua1...@gmail.com>: > >> Hi Marke, >> >> As soon as I didn't really implement this code, but I think you can >> replace this line of code: >> >> *OUT result = schema.deserialize(delivery.getBody()); >> //RMQSource#run* >> >> instead of defining an abstract method in RMQSource, such as: >> normalize/deserialize, the input parameter is Delivery, >> and the output parameter is generic type <Out> and implement your custom >> logic in this method. >> >> Thanks, vino. >> >> Marke Builder <marke.buil...@gmail.com> 于2018年9月10日周一 上午12:32写道: >> >>> Hi Vino, >>> >>> many thanks for your response, the solution works! But I have one >>> additional question, >>> What is the best way to override the RMQSource#run without access to the >>> RMQSource variable "running" ? >>> >>> Thanks, Martin. >>> >>> Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang < >>> yanghua1...@gmail.com>: >>> >>>> Hi Marke, >>>> >>>> As you said, you need to extend RMQSource because Flink's rabbitmq >>>> connector only extracts the body of Delivery. >>>> Therefore, in order to achieve your purpose, you need to add a property >>>> to the specific data type of your DataStream >>>> to represent the userId, then override the RMQSource#run method and >>>> extract the userId from the properties of Delivery. >>>> Of course, in addition, maybe you Need to pay attention to the >>>> implementation of DeserializationSchema. >>>> >>>> Thanks, vino. >>>> >>>> Marke Builder <marke.buil...@gmail.com> 于2018年9月8日周六 下午3:44写道: >>>> >>>>> Hi, >>>>> >>>>> how I can get the UserId from the Properties in my DataStream? >>>>> >>>>> I can read the userId if I extend the RMQSource Class: >>>>> QueueingConsumer.Delivery delivery = consumer.nextDelivery(); >>>>> String userId = delivery.getProperties().getUserId(); >>>>> >>>>> But how can I provide this to my DataStream ? >>>>> >>>>> Best regards, >>>>> Martin >>>>> >>>>