@Austin in my initial implementation you get the envelope as well. I basically pass to the interface everything i get from the RMQ client
https://github.com/senegalo/flink/blob/e67f344884b4186126c38eaa8e112d6e5cf1152e/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeliveryParser.java#L26 Regards, Karim Mansour On Tue, May 5, 2020 at 12:38 AM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Thanks Aljoscha, > > I'm happy to take FLINK-17204 > <https://issues.apache.org/jira/browse/FLINK-17204> for now, if you're > able > to assign it to me, and we'll go from there? > > Excited to use what you come up with Karim! It also looks like FLINK-8510 > <https://issues.apache.org/jira/browse/FLINK-8510> might also have some > ideas on getting access to more RMQ-specific data in the source. > > Best, > Austin > > On Mon, May 4, 2020 at 6:58 AM seneg...@gmail.com <seneg...@gmail.com> > wrote: > > > Hi, > > > > Okay i created a ticket: > https://issues.apache.org/jira/browse/FLINK-17502 > > > > i will work on the modifications "keeping the old constructor" and brush > up > > on the contribution guides and move from there :) > > > > Regards, > > Karim Mansour > > > > On Mon, May 4, 2020 at 10:00 AM Aljoscha Krettek <aljos...@apache.org> > > wrote: > > > > > Yes, that's what I was proposing! > > > > > > @Karim If there's not already a Jira issue, please create one. You can > > > ping me, so that I can assign you. > > > > > > @Austin There's a Jira component for the RMQ source, maybe you can take > > > a stab at some of the issues there: > > > > > > > > > https://issues.apache.org/jira/browse/FLINK-17204?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%2F%20RabbitMQ%22%20AND%20statusCategory%20!%3D%20Done > > > . > > > > > > Best, > > > Aljoscha > > > > > > On 03.05.20 16:38, seneg...@gmail.com wrote: > > > > Hi, > > > > > > > > Okay so keep the current constructors as is, create new ones with > more > > > > granular parsing of the results. Sounds like a good plan. > > > > > > > > How do we proceed from here ? > > > > > > > > Regards, > > > > Karim Mansour > > > > > > > > On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards < > > > > austin.caw...@gmail.com> wrote: > > > > > > > >> Hey, > > > >> > > > >> (Switching to my personal email) > > > >> > > > >> Correct me if I'm wrong, but I think Aljoscha is proposing keeping > the > > > >> public API as is, and adding some new constructors/ custom > > > deserialization > > > >> schemas as was done with Kafka. Here's what I was able to find on > that > > > >> feature: > > > >> > > > >> * https://issues.apache.org/jira/browse/FLINK-8354 > > > >> * > > > >> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java > > > >> * > > > >> > > > >> > > > > > > https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114 > > > >> > > > >> Best, > > > >> Austin > > > >> > > > >> On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com < > seneg...@gmail.com > > > > > > >> wrote: > > > >> > > > >>> Hello, > > > >>> > > > >>> So the proposal is to keep the current RMQSource constructors / > > public > > > >> api > > > >>> as is and create new ones that gives more granular parsing ? > > > >>> > > > >>> Regards, > > > >>> Karim Mansour > > > >>> > > > >>> On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards < > > > >>> aus...@fintechstudios.com> wrote: > > > >>> > > > >>>> Hey all + thanks Konstantin, > > > >>>> > > > >>>> Like mentioned, we also run into issues with the RMQ Source > > > >>> inflexibility. > > > >>>> I think Aljoscha's idea of supporting both would be a nice way to > > > >>>> incorporate new changes without breaking the current API. > > > >>>> > > > >>>> We'd definitely benefit from the changes proposed here but have > > > another > > > >>>> issue with the Correlation ID. When a message gets in the queue > > > >> without a > > > >>>> correlation ID, the source errors and the job cannot recover, > > > requiring > > > >>>> (painful) manual intervention. It would be nice to be able to > > > >> dead-letter > > > >>>> these inputs from the source, but I don't think that's possible > with > > > >> the > > > >>>> current source interface (don't know too much about the source > > > >>> specifics). > > > >>>> We might be able to work around this with a custom Correlation ID > > > >>>> extractor, as proposed by Karim. > > > >>>> > > > >>>> Also, if there are other tickets in the RMQ integrations that have > > > gone > > > >>>> unmaintained, I'm also happy to chip it at maintaining them! > > > >>>> > > > >>>> Best, > > > >>>> Austin > > > >>>> ________________________________ > > > >>>> From: Konstantin Knauf <kna...@apache.org> > > > >>>> Sent: Thursday, April 30, 2020 6:14 AM > > > >>>> To: dev <dev@flink.apache.org> > > > >>>> Cc: Austin Cawley-Edwards <aus...@fintechstudios.com> > > > >>>> Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes > > > >>>> > > > >>>> Hi everyone, > > > >>>> > > > >>>> just looping in Austin as he mentioned that they also ran into > > issues > > > >> due > > > >>>> to the inflexibility of the RabiitMQSourcce to me yesterday. > > > >>>> > > > >>>> Cheers, > > > >>>> > > > >>>> Konstantin > > > >>>> > > > >>>> On Thu, Apr 30, 2020 at 11:23 AM seneg...@gmail.com<mailto: > > > >>>> seneg...@gmail.com> <seneg...@gmail.com<mailto:seneg...@gmail.com > >> > > > >>> wrote: > > > >>>> Hello Guys, > > > >>>> > > > >>>> Thanks for all the responses, i want to stress out that i didn't > > feel > > > >>>> ignored i just thought that i forgot an important step or > something. > > > >>>> > > > >>>> Since i am a newbie i would follow whatever route you guys would > > > >> suggest > > > >>> :) > > > >>>> and i agree that the RMQ connector needs a lot of love still > "which > > i > > > >>> would > > > >>>> be happy to submit gradually" > > > >>>> > > > >>>> as for the code i have it here in the PR: > > > >>>> https://github.com/senegalo/flink/pull/1 it's not that much of a > > > >> change > > > >>> in > > > >>>> terms of logic but more of what is exposed. > > > >>>> > > > >>>> Let me know how you want me to proceed. > > > >>>> > > > >>>> Thanks again, > > > >>>> Karim Mansour > > > >>>> > > > >>>> On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek < > > > aljos...@apache.org > > > >>>> <mailto:aljos...@apache.org>> > > > >>>> wrote: > > > >>>> > > > >>>>> Hi, > > > >>>>> > > > >>>>> I think it's good to contribute the changes to Flink directly > since > > > >> we > > > >>>>> already have the RMQ connector in the respository. > > > >>>>> > > > >>>>> I would propose something similar to the Kafka connector, which > > takes > > > >>>>> both the generic DeserializationSchema and a > > > >> KafkaDeserializationSchema > > > >>>>> that is specific to Kafka and allows access to the ConsumerRecord > > and > > > >>>>> therefore all the Kafka features. What do you think about that? > > > >>>>> > > > >>>>> Best, > > > >>>>> Aljoscha > > > >>>>> > > > >>>>> On 30.04.20 10:26, Robert Metzger wrote: > > > >>>>>> Hey Karim, > > > >>>>>> > > > >>>>>> I'm sorry that you had such a bad experience contributing to > > Flink, > > > >>>> even > > > >>>>>> though you are nicely following the rules. > > > >>>>>> > > > >>>>>> You mentioned that you've implemented the proposed change > already. > > > >>>> Could > > > >>>>>> you share a link to a branch here so that we can take a look? I > > can > > > >>>>> assess > > > >>>>>> the API changes easier if I see them :) > > > >>>>>> > > > >>>>>> Thanks a lot! > > > >>>>>> > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Robert > > > >>>>>> > > > >>>>>> On Thu, Apr 30, 2020 at 8:09 AM Dawid Wysakowicz < > > > >>>> dwysakow...@apache.org<mailto:dwysakow...@apache.org> > > > >>>>>> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Karim, > > > >>>>>>> > > > >>>>>>> Sorry you did not have the best first time experience. You > > > >> certainly > > > >>>> did > > > >>>>>>> everything right which I definitely appreciate. > > > >>>>>>> > > > >>>>>>> The problem in that particular case, as I see it, is that > > RabbitMQ > > > >>> is > > > >>>>>>> not very actively maintained and therefore it is not easy too > > > >> find a > > > >>>>>>> committer willing to take on this topic. The point of > connectors > > > >> not > > > >>>>>>> being properly maintained was raised a few times in the past on > > > >> the > > > >>>> ML. > > > >>>>>>> One of the ideas how to improve the situation there was to > start > > a > > > >>>>>>> https://flink-packages.org/ page. The idea is to ask active > > users > > > >>> of > > > >>>>>>> certain connectors to maintain those connectors outside of the > > > >> core > > > >>>>>>> project, while giving them a platform within the community > where > > > >>> they > > > >>>>>>> can make their modules visible. That way it is possible to > > > >> overcome > > > >>>> the > > > >>>>>>> lack of capabilities within the core committers without loosing > > > >> much > > > >>>> on > > > >>>>>>> the visibility. > > > >>>>>>> > > > >>>>>>> I would kindly ask you to consider that path, if you are > > > >> interested. > > > >>>> You > > > >>>>>>> can of course also wait/reach out to more committers if you > feel > > > >>>> strong > > > >>>>>>> about contributing those changes back to the Flink repository > > > >>> itself. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> > > > >>>>>>> Dawid > > > >>>>>>> > > > >>>>>>> On 30/04/2020 07:29, seneg...@gmail.com<mailto: > > seneg...@gmail.com > > > >>> > > > >>>> wrote: > > > >>>>>>>> Hello, > > > >>>>>>>> > > > >>>>>>>> I am new to the mailing list and to contributing in Big > > > >> opensource > > > >>>>>>> projects > > > >>>>>>>> in general and i don't know if i did something wrong or should > > be > > > >>>> more > > > >>>>>>>> patient :) > > > >>>>>>>> > > > >>>>>>>> I put a topic for discussion as per the contribution guide " > > > >>>>>>>> https://flink.apache.org/contributing/how-to-contribute.html" > > > >>>> almost a > > > >>>>>>> week > > > >>>>>>>> ago and since what i propose is not backward compatible it > needs > > > >> to > > > >>>> be > > > >>>>>>>> discussed here before opening a ticket and moving forward. > > > >>>>>>>> > > > >>>>>>>> So my question is. Will someone pick the discussion up ? or at > > > >>> least > > > >>>>>>>> someone would say that this is not the way to go ? or should i > > > >>> assume > > > >>>>>>> from > > > >>>>>>>> the silence that it's not important / relevant to the project > ? > > > >>>> Should > > > >>>>> i > > > >>>>>>>> track the author of the connector and send him directly ? > > > >>>>>>>> > > > >>>>>>>> Thank you for your time. > > > >>>>>>>> > > > >>>>>>>> Regards, > > > >>>>>>>> Karim Mansour > > > >>>>>>>> > > > >>>>>>>> On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com<mailto: > > > >>>> seneg...@gmail.com> < > > > >>>>> seneg...@gmail.com<mailto:seneg...@gmail.com>> > > > >>>>>>>> wrote: > > > >>>>>>>> > > > >>>>>>>>> Dear All, > > > >>>>>>>>> > > > >>>>>>>>> I want to propose a change to the current RabbitMQ connector. > > > >>>>>>>>> > > > >>>>>>>>> Currently the RMQSource is extracting the body of the message > > > >>> which > > > >>>>> is a > > > >>>>>>>>> byte array and pass it to a an instance of a user > > implementation > > > >>> of > > > >>>>> the > > > >>>>>>>>> DeserializationSchema class to deserialize the body of the > > > >>> message. > > > >>>> It > > > >>>>>>>>> also uses the correlation id from the message properties to > > > >>>>> deduplicate > > > >>>>>>> the > > > >>>>>>>>> message. > > > >>>>>>>>> > > > >>>>>>>>> What i want to propose is instead of taking a implementation > of > > > >> a > > > >>>>>>>>> DeserializationSchema in the RMQSource constructor, actually > > > >> have > > > >>>> the > > > >>>>>>>>> user implement an interface that would have methods both the > > > >>> output > > > >>>>> for > > > >>>>>>> the > > > >>>>>>>>> RMQSource and the correlation id used not only from the body > of > > > >>> the > > > >>>>>>> message > > > >>>>>>>>> but also to it's metadata and properties thus giving the > > > >> connector > > > >>>>> much > > > >>>>>>>>> more power and flexibility. > > > >>>>>>>>> > > > >>>>>>>>> This of course would mean a breaking API change for the > > > >> RMQSource > > > >>>>> since > > > >>>>>>> it > > > >>>>>>>>> will no longer take a DeserializationSchema but an > > > >> implementation > > > >>>> of a > > > >>>>>>>>> predefined interface that has the methods to extract both the > > > >>> output > > > >>>>> of > > > >>>>>>> the > > > >>>>>>>>> RMQSource and the to extract the unique message id as well. > > > >>>>>>>>> > > > >>>>>>>>> The reason behind that is that in my company we were relaying > > on > > > >>>>> another > > > >>>>>>>>> property the message id for deduplication of the messages > and i > > > >>> also > > > >>>>>>> needed > > > >>>>>>>>> that information further down the pipeline and there was > > > >>> absolutely > > > >>>> no > > > >>>>>>> way > > > >>>>>>>>> of getting it other than modifying the RMQSource. > > > >>>>>>>>> > > > >>>>>>>>> I already have code written but as the rules dictates i have > to > > > >>> run > > > >>>> it > > > >>>>>>> by > > > >>>>>>>>> you guys first before i attempt to create a Jira ticket :) > > > >>>>>>>>> > > > >>>>>>>>> Let me know what you think. > > > >>>>>>>>> > > > >>>>>>>>> Regards, > > > >>>>>>>>> Karim Mansour > > > >>>>>>>>> > > > >>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> > > > >>>> Konstantin Knauf > > > >>>> > > > >>>> https://twitter.com/snntrable > > > >>>> > > > >>>> https://github.com/knaufk > > > >>>> > > > >>> > > > >> > > > > > > > > > > > > >