Hi Marke,
Should not use the code like this :
*delivery.getProperties().getUserId();*
to get the userId from Delivery object?
And for second code example, Since you got the object of TimeSeriesType
type, should not define *DataStream* instead of
*DataStream*.
Regarding userId, I just said that
Hi Marke,
As soon as I didn't really implement this code, but I think you can replace
this line of code:
*OUT result = schema.deserialize(delivery.getBody());
//RMQSource#run*
instead of defining an abstract method in RMQSource, such as:
normalize/deserialize, the input parameter is Delivery,
an
Hi Marke,
As you said, you need to extend RMQSource because Flink's rabbitmq
connector only extracts the body of Delivery.
Therefore, in order to achieve your purpose, you need to add a property to
the specific data type of your DataStream
to represent the userId, then override the RMQSource#run m
Hi,
how I can get the UserId from the Properties in my DataStream?
I can read the userId if I extend the RMQSource Class:
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String userId = delivery.getProperties().getUserId();
But how can I provide this to my DataStream ?
Best regard