Hi Marke,

As soon as I didn't really implement this code, but I think you can replace
this line of code:

*OUT result = schema.deserialize(delivery.getBody());
//RMQSource#run*

instead of defining an abstract method in RMQSource, such as:
normalize/deserialize, the input parameter is Delivery,
and the output parameter is generic type <Out> and implement your custom
logic in this method.

Thanks, vino.

Marke Builder <marke.buil...@gmail.com> 于2018年9月10日周一 上午12:32写道:

> Hi Vino,
>
> many thanks for your response, the solution works! But I have one
> additional question,
> What is the best way to override the RMQSource#run without access to the
> RMQSource variable "running" ?
>
> Thanks, Martin.
>
> Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <yanghua1...@gmail.com
> >:
>
>> Hi Marke,
>>
>> As you said, you need to extend RMQSource because Flink's rabbitmq
>> connector only extracts the body of Delivery.
>> Therefore, in order to achieve your purpose, you need to add a property
>> to the specific data type of your DataStream
>> to represent the userId, then override the RMQSource#run method and
>> extract the userId from the properties of Delivery.
>> Of course, in addition, maybe you Need to pay attention to the
>> implementation of DeserializationSchema.
>>
>> Thanks, vino.
>>
>> Marke Builder <marke.buil...@gmail.com> 于2018年9月8日周六 下午3:44写道:
>>
>>> Hi,
>>>
>>> how I can get the UserId from the Properties in my DataStream?
>>>
>>> I can read the userId if I extend the RMQSource Class:
>>> QueueingConsumer.Delivery delivery = consumer.nextDelivery();
>>> String userId = delivery.getProperties().getUserId();
>>>
>>> But how can I provide this to my DataStream ?
>>>
>>> Best regards,
>>> Martin
>>>
>>

Reply via email to