@Austin in my initial implementation you get the envelope as well. I
basically pass to the interface everything i get from the RMQ client

https://github.com/senegalo/flink/blob/e67f344884b4186126c38eaa8e112d6e5cf1152e/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeliveryParser.java#L26

Regards,
Karim Mansour

On Tue, May 5, 2020 at 12:38 AM Austin Cawley-Edwards <
austin.caw...@gmail.com> wrote:

> Thanks Aljoscha,
>
> I'm happy to take FLINK-17204
> <https://issues.apache.org/jira/browse/FLINK-17204> for now, if you're
> able
> to assign it to me, and we'll go from there?
>
> Excited to use what you come up with Karim! It also looks like FLINK-8510
> <https://issues.apache.org/jira/browse/FLINK-8510> might also have some
> ideas on getting access to more RMQ-specific data in the source.
>
> Best,
> Austin
>
> On Mon, May 4, 2020 at 6:58 AM seneg...@gmail.com <seneg...@gmail.com>
> wrote:
>
> > Hi,
> >
> > Okay i created a ticket:
> https://issues.apache.org/jira/browse/FLINK-17502
> >
> > i will work on the modifications "keeping the old constructor" and brush
> up
> > on the contribution guides and move from there :)
> >
> > Regards,
> > Karim Mansour
> >
> > On Mon, May 4, 2020 at 10:00 AM Aljoscha Krettek <aljos...@apache.org>
> > wrote:
> >
> > > Yes, that's what I was proposing!
> > >
> > > @Karim If there's not already a Jira issue, please create one. You can
> > > ping me, so that I can assign you.
> > >
> > > @Austin There's a Jira component for the RMQ source, maybe you can take
> > > a stab at some of the issues there:
> > >
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-17204?jql=project%20%3D%20FLINK%20AND%20component%20%3D%20%22Connectors%2F%20RabbitMQ%22%20AND%20statusCategory%20!%3D%20Done
> > > .
> > >
> > > Best,
> > > Aljoscha
> > >
> > > On 03.05.20 16:38, seneg...@gmail.com wrote:
> > > > Hi,
> > > >
> > > > Okay so keep the current constructors as is, create new ones with
> more
> > > > granular parsing of the results. Sounds like a good plan.
> > > >
> > > > How do we proceed from here ?
> > > >
> > > > Regards,
> > > > Karim Mansour
> > > >
> > > > On Fri, May 1, 2020 at 5:03 PM Austin Cawley-Edwards <
> > > > austin.caw...@gmail.com> wrote:
> > > >
> > > >> Hey,
> > > >>
> > > >> (Switching to my personal email)
> > > >>
> > > >> Correct me if I'm wrong, but I think Aljoscha is proposing keeping
> the
> > > >> public API as is, and adding some new constructors/ custom
> > > deserialization
> > > >> schemas as was done with Kafka. Here's what I was able to find on
> that
> > > >> feature:
> > > >>
> > > >> * https://issues.apache.org/jira/browse/FLINK-8354
> > > >> *
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaDeserializationSchema.java
> > > >> *
> > > >>
> > > >>
> > >
> >
> https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java#L100-L114
> > > >>
> > > >> Best,
> > > >> Austin
> > > >>
> > > >> On Fri, May 1, 2020 at 6:19 AM seneg...@gmail.com <
> seneg...@gmail.com
> > >
> > > >> wrote:
> > > >>
> > > >>> Hello,
> > > >>>
> > > >>> So the proposal is to keep the current RMQSource constructors /
> > public
> > > >> api
> > > >>> as is and create new ones that gives more granular parsing ?
> > > >>>
> > > >>> Regards,
> > > >>> Karim Mansour
> > > >>>
> > > >>> On Thu, Apr 30, 2020 at 5:23 PM Austin Cawley-Edwards <
> > > >>> aus...@fintechstudios.com> wrote:
> > > >>>
> > > >>>> Hey all + thanks Konstantin,
> > > >>>>
> > > >>>> Like mentioned, we also run into issues with the RMQ Source
> > > >>> inflexibility.
> > > >>>> I think Aljoscha's idea of supporting both would be a nice way to
> > > >>>> incorporate new changes without breaking the current API.
> > > >>>>
> > > >>>> We'd definitely benefit from the changes proposed here but have
> > > another
> > > >>>> issue with the Correlation ID. When a message gets in the queue
> > > >> without a
> > > >>>> correlation ID, the source errors and the job cannot recover,
> > > requiring
> > > >>>> (painful) manual intervention. It would be nice to be able to
> > > >> dead-letter
> > > >>>> these inputs from the source, but I don't think that's possible
> with
> > > >> the
> > > >>>> current source interface (don't know too much about the source
> > > >>> specifics).
> > > >>>> We might be able to work around this with a custom Correlation ID
> > > >>>> extractor, as proposed by Karim.
> > > >>>>
> > > >>>> Also, if there are other tickets in the RMQ integrations that have
> > > gone
> > > >>>> unmaintained, I'm also happy to chip it at maintaining them!
> > > >>>>
> > > >>>> Best,
> > > >>>> Austin
> > > >>>> ________________________________
> > > >>>> From: Konstantin Knauf <kna...@apache.org>
> > > >>>> Sent: Thursday, April 30, 2020 6:14 AM
> > > >>>> To: dev <dev@flink.apache.org>
> > > >>>> Cc: Austin Cawley-Edwards <aus...@fintechstudios.com>
> > > >>>> Subject: Re: [DISCUSS] flink-connector-rabbitmq api changes
> > > >>>>
> > > >>>> Hi everyone,
> > > >>>>
> > > >>>> just looping in Austin as he mentioned that they also ran into
> > issues
> > > >> due
> > > >>>> to the inflexibility of the RabiitMQSourcce to me yesterday.
> > > >>>>
> > > >>>> Cheers,
> > > >>>>
> > > >>>> Konstantin
> > > >>>>
> > > >>>> On Thu, Apr 30, 2020 at 11:23 AM seneg...@gmail.com<mailto:
> > > >>>> seneg...@gmail.com> <seneg...@gmail.com<mailto:seneg...@gmail.com
> >>
> > > >>> wrote:
> > > >>>> Hello Guys,
> > > >>>>
> > > >>>> Thanks for all the responses, i want to stress out that i didn't
> > feel
> > > >>>> ignored i just thought that i forgot an important step or
> something.
> > > >>>>
> > > >>>> Since i am a newbie i would follow whatever route you guys would
> > > >> suggest
> > > >>> :)
> > > >>>> and i agree that the RMQ connector needs a lot of love still
> "which
> > i
> > > >>> would
> > > >>>> be happy to submit gradually"
> > > >>>>
> > > >>>> as for the code i have it here in the PR:
> > > >>>> https://github.com/senegalo/flink/pull/1 it's not that much of a
> > > >> change
> > > >>> in
> > > >>>> terms of logic but more of what is exposed.
> > > >>>>
> > > >>>> Let me know how you want me to proceed.
> > > >>>>
> > > >>>> Thanks again,
> > > >>>> Karim Mansour
> > > >>>>
> > > >>>> On Thu, Apr 30, 2020 at 10:40 AM Aljoscha Krettek <
> > > aljos...@apache.org
> > > >>>> <mailto:aljos...@apache.org>>
> > > >>>> wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>>
> > > >>>>> I think it's good to contribute the changes to Flink directly
> since
> > > >> we
> > > >>>>> already have the RMQ connector in the respository.
> > > >>>>>
> > > >>>>> I would propose something similar to the Kafka connector, which
> > takes
> > > >>>>> both the generic DeserializationSchema and a
> > > >> KafkaDeserializationSchema
> > > >>>>> that is specific to Kafka and allows access to the ConsumerRecord
> > and
> > > >>>>> therefore all the Kafka features. What do you think about that?
> > > >>>>>
> > > >>>>> Best,
> > > >>>>> Aljoscha
> > > >>>>>
> > > >>>>> On 30.04.20 10:26, Robert Metzger wrote:
> > > >>>>>> Hey Karim,
> > > >>>>>>
> > > >>>>>> I'm sorry that you had such a bad experience contributing to
> > Flink,
> > > >>>> even
> > > >>>>>> though you are nicely following the rules.
> > > >>>>>>
> > > >>>>>> You mentioned that you've implemented the proposed change
> already.
> > > >>>> Could
> > > >>>>>> you share a link to a branch here so that we can take a look? I
> > can
> > > >>>>> assess
> > > >>>>>> the API changes easier if I see them :)
> > > >>>>>>
> > > >>>>>> Thanks a lot!
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Best,
> > > >>>>>> Robert
> > > >>>>>>
> > > >>>>>> On Thu, Apr 30, 2020 at 8:09 AM Dawid Wysakowicz <
> > > >>>> dwysakow...@apache.org<mailto:dwysakow...@apache.org>
> > > >>>>>>
> > > >>>>>> wrote:
> > > >>>>>>
> > > >>>>>>> Hi Karim,
> > > >>>>>>>
> > > >>>>>>> Sorry you did not have the best first time experience. You
> > > >> certainly
> > > >>>> did
> > > >>>>>>> everything right which I definitely appreciate.
> > > >>>>>>>
> > > >>>>>>> The problem in that particular case, as I see it, is that
> > RabbitMQ
> > > >>> is
> > > >>>>>>> not very actively maintained and therefore it is not easy too
> > > >> find a
> > > >>>>>>> committer willing to take on this topic. The point of
> connectors
> > > >> not
> > > >>>>>>> being properly maintained was raised a few times in the past on
> > > >> the
> > > >>>> ML.
> > > >>>>>>> One of the ideas how to improve the situation there was to
> start
> > a
> > > >>>>>>> https://flink-packages.org/ page. The idea is to ask active
> > users
> > > >>> of
> > > >>>>>>> certain connectors to maintain those connectors outside of the
> > > >> core
> > > >>>>>>> project, while giving them a platform within the community
> where
> > > >>> they
> > > >>>>>>> can make their modules visible. That way it is possible to
> > > >> overcome
> > > >>>> the
> > > >>>>>>> lack of capabilities within the core committers without loosing
> > > >> much
> > > >>>> on
> > > >>>>>>> the visibility.
> > > >>>>>>>
> > > >>>>>>> I would kindly ask you to consider that path, if you are
> > > >> interested.
> > > >>>> You
> > > >>>>>>> can of course also wait/reach out to more committers if you
> feel
> > > >>>> strong
> > > >>>>>>> about contributing those changes back to the Flink repository
> > > >>> itself.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>>
> > > >>>>>>> Dawid
> > > >>>>>>>
> > > >>>>>>> On 30/04/2020 07:29, seneg...@gmail.com<mailto:
> > seneg...@gmail.com
> > > >>>
> > > >>>> wrote:
> > > >>>>>>>> Hello,
> > > >>>>>>>>
> > > >>>>>>>> I am new to the mailing list and to contributing in Big
> > > >> opensource
> > > >>>>>>> projects
> > > >>>>>>>> in general and i don't know if i did something wrong or should
> > be
> > > >>>> more
> > > >>>>>>>> patient :)
> > > >>>>>>>>
> > > >>>>>>>> I put a topic for discussion as per the contribution guide "
> > > >>>>>>>> https://flink.apache.org/contributing/how-to-contribute.html";
> > > >>>> almost a
> > > >>>>>>> week
> > > >>>>>>>> ago and since what i propose is not backward compatible it
> needs
> > > >> to
> > > >>>> be
> > > >>>>>>>> discussed here before opening a ticket and moving forward.
> > > >>>>>>>>
> > > >>>>>>>> So my question is. Will someone pick the discussion up ? or at
> > > >>> least
> > > >>>>>>>> someone would say that this is not the way to go ? or should i
> > > >>> assume
> > > >>>>>>> from
> > > >>>>>>>> the silence that it's not important / relevant to the project
> ?
> > > >>>> Should
> > > >>>>> i
> > > >>>>>>>> track the author of the connector and send him directly ?
> > > >>>>>>>>
> > > >>>>>>>> Thank you for your time.
> > > >>>>>>>>
> > > >>>>>>>> Regards,
> > > >>>>>>>> Karim Mansour
> > > >>>>>>>>
> > > >>>>>>>> On Fri, Apr 24, 2020 at 11:17 AM seneg...@gmail.com<mailto:
> > > >>>> seneg...@gmail.com> <
> > > >>>>> seneg...@gmail.com<mailto:seneg...@gmail.com>>
> > > >>>>>>>> wrote:
> > > >>>>>>>>
> > > >>>>>>>>> Dear All,
> > > >>>>>>>>>
> > > >>>>>>>>> I want to propose a change to the current RabbitMQ connector.
> > > >>>>>>>>>
> > > >>>>>>>>> Currently the RMQSource is extracting the body of the message
> > > >>> which
> > > >>>>> is a
> > > >>>>>>>>> byte array and pass it to a an instance of a user
> > implementation
> > > >>> of
> > > >>>>> the
> > > >>>>>>>>> DeserializationSchema class to deserialize the body of the
> > > >>> message.
> > > >>>> It
> > > >>>>>>>>> also uses the correlation id from the message properties to
> > > >>>>> deduplicate
> > > >>>>>>> the
> > > >>>>>>>>> message.
> > > >>>>>>>>>
> > > >>>>>>>>> What i want to propose is instead of taking a implementation
> of
> > > >> a
> > > >>>>>>>>> DeserializationSchema in the RMQSource constructor, actually
> > > >> have
> > > >>>> the
> > > >>>>>>>>> user implement an interface that would have methods both the
> > > >>> output
> > > >>>>> for
> > > >>>>>>> the
> > > >>>>>>>>> RMQSource and the correlation id used not only from the body
> of
> > > >>> the
> > > >>>>>>> message
> > > >>>>>>>>> but also to it's metadata and properties thus giving the
> > > >> connector
> > > >>>>> much
> > > >>>>>>>>> more power and flexibility.
> > > >>>>>>>>>
> > > >>>>>>>>> This of course would mean a breaking API change for the
> > > >> RMQSource
> > > >>>>> since
> > > >>>>>>> it
> > > >>>>>>>>> will no longer take a DeserializationSchema but an
> > > >> implementation
> > > >>>> of a
> > > >>>>>>>>> predefined interface that has the methods to extract both the
> > > >>> output
> > > >>>>> of
> > > >>>>>>> the
> > > >>>>>>>>> RMQSource and the to extract the unique message id as well.
> > > >>>>>>>>>
> > > >>>>>>>>> The reason behind that is that in my company we were relaying
> > on
> > > >>>>> another
> > > >>>>>>>>> property the message id for deduplication of the messages
> and i
> > > >>> also
> > > >>>>>>> needed
> > > >>>>>>>>> that information further down the pipeline and there was
> > > >>> absolutely
> > > >>>> no
> > > >>>>>>> way
> > > >>>>>>>>> of getting it other than modifying the RMQSource.
> > > >>>>>>>>>
> > > >>>>>>>>> I already have code written but as the rules dictates i have
> to
> > > >>> run
> > > >>>> it
> > > >>>>>>> by
> > > >>>>>>>>> you guys first before i attempt to create a Jira ticket :)
> > > >>>>>>>>>
> > > >>>>>>>>> Let me know what you think.
> > > >>>>>>>>>
> > > >>>>>>>>> Regards,
> > > >>>>>>>>> Karim Mansour
> > > >>>>>>>>>
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> --
> > > >>>>
> > > >>>> Konstantin Knauf
> > > >>>>
> > > >>>> https://twitter.com/snntrable
> > > >>>>
> > > >>>> https://github.com/knaufk
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to