Hi Marke, As soon as I didn't really implement this code, but I think you can replace this line of code:
*OUT result = schema.deserialize(delivery.getBody()); //RMQSource#run* instead of defining an abstract method in RMQSource, such as: normalize/deserialize, the input parameter is Delivery, and the output parameter is generic type <Out> and implement your custom logic in this method. Thanks, vino. Marke Builder <marke.buil...@gmail.com> 于2018年9月10日周一 上午12:32写道: > Hi Vino, > > many thanks for your response, the solution works! But I have one > additional question, > What is the best way to override the RMQSource#run without access to the > RMQSource variable "running" ? > > Thanks, Martin. > > Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <yanghua1...@gmail.com > >: > >> Hi Marke, >> >> As you said, you need to extend RMQSource because Flink's rabbitmq >> connector only extracts the body of Delivery. >> Therefore, in order to achieve your purpose, you need to add a property >> to the specific data type of your DataStream >> to represent the userId, then override the RMQSource#run method and >> extract the userId from the properties of Delivery. >> Of course, in addition, maybe you Need to pay attention to the >> implementation of DeserializationSchema. >> >> Thanks, vino. >> >> Marke Builder <marke.buil...@gmail.com> 于2018年9月8日周六 下午3:44写道: >> >>> Hi, >>> >>> how I can get the UserId from the Properties in my DataStream? >>> >>> I can read the userId if I extend the RMQSource Class: >>> QueueingConsumer.Delivery delivery = consumer.nextDelivery(); >>> String userId = delivery.getProperties().getUserId(); >>> >>> But how can I provide this to my DataStream ? >>> >>> Best regards, >>> Martin >>> >>