Hi Marke,

Should not use the code like this :

*delivery.getProperties().getUserId();*

to get the userId from Delivery object?

And for second code example, Since you got the object of TimeSeriesType
type, should not define *DataStream<TimeSeriesType>* instead of
*DataStream<String>*.

Regarding userId, I just said that this is a way of extracting. But if it
doesn't have a value in itself, then there is no way to get it. Can you
confirm that the message itself has value in RabbitMQ?

Thanks, vino.

Marke Builder <marke.buil...@gmail.com> 于2018年9月11日周二 下午4:34写道:

> Hi Vino,
>
> this is what I done, but no user Id available. And the first question was
> about the running parameter in RMQSource#boolean running.
>
> Code example:
> @Override
> run(SourceContext cts) {
> ....
> TimeSeriesType result = (TimeSeriesType)
> schema.deserialize(delivery.getBody());
> .....
> final String userId = delivery.getProperties().
> result.setDeviceId(userId);
> ......
> ctx.collect(result);
>
>
> And the DataStream looks like this:
> final DataStream<String> stream = env
>                 .addSource(new RabbitmqStreamProcessorV2(
>                         connectionConfig,
>                         fastDataQueue,
>                         new
> AbstractDeserializationSchema<TimeSeriesType>() {
>                             @Override
>                             public TimeSeriesType deserialize(byte[]
> bytes) throws IOException {
>                                 TimeSeriesType message = null;
>                                 try {
>                                      message = xmlParser.parse(new
> String(bytes, "UTF8"));
>                                      logger.info("Data/Message size: "
> +String.valueOf(message.getData().size()));
>                                 } catch (JAXBException e) {
>                                     e.printStackTrace();
>                                     logger.log(Level.INFO, e.toString());
>                                 }
>                                 return message;
>                             }
>                         }))
>                 .flatMap(.....
>
>
>
>
>
>
> Am Mo., 10. Sep. 2018 um 03:52 Uhr schrieb vino yang <
> yanghua1...@gmail.com>:
>
>> Hi Marke,
>>
>> As soon as I didn't really implement this code, but I think you can
>> replace this line of code:
>>
>> *OUT result = schema.deserialize(delivery.getBody());
>> //RMQSource#run*
>>
>> instead of defining an abstract method in RMQSource, such as:
>> normalize/deserialize, the input parameter is Delivery,
>> and the output parameter is generic type <Out> and implement your custom
>> logic in this method.
>>
>> Thanks, vino.
>>
>> Marke Builder <marke.buil...@gmail.com> 于2018年9月10日周一 上午12:32写道:
>>
>>> Hi Vino,
>>>
>>> many thanks for your response, the solution works! But I have one
>>> additional question,
>>> What is the best way to override the RMQSource#run without access to the
>>> RMQSource variable "running" ?
>>>
>>> Thanks, Martin.
>>>
>>> Am Sa., 8. Sep. 2018 um 10:15 Uhr schrieb vino yang <
>>> yanghua1...@gmail.com>:
>>>
>>>> Hi Marke,
>>>>
>>>> As you said, you need to extend RMQSource because Flink's rabbitmq
>>>> connector only extracts the body of Delivery.
>>>> Therefore, in order to achieve your purpose, you need to add a property
>>>> to the specific data type of your DataStream
>>>> to represent the userId, then override the RMQSource#run method and
>>>> extract the userId from the properties of Delivery.
>>>> Of course, in addition, maybe you Need to pay attention to the
>>>> implementation of DeserializationSchema.
>>>>
>>>> Thanks, vino.
>>>>
>>>> Marke Builder <marke.buil...@gmail.com> 于2018年9月8日周六 下午3:44写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> how I can get the UserId from the Properties in my DataStream?
>>>>>
>>>>> I can read the userId if I extend the RMQSource Class:
>>>>> QueueingConsumer.Delivery delivery = consumer.nextDelivery();
>>>>> String userId = delivery.getProperties().getUserId();
>>>>>
>>>>> But how can I provide this to my DataStream ?
>>>>>
>>>>> Best regards,
>>>>> Martin
>>>>>
>>>>

Reply via email to