Hi Flink Devs, Thanks for all the feedback flink devs.
Following the procedure outlined on the Flink Improvement Proposal Confluence page [1], we kindly ask the PMC/Committers to transfer the content from the Amazon SQS Source Connector Google Doc [2] and assign a FLIP Number for us, which we will use for voting. [1] https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process [2] https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing Regards Saurabh & Abhi On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh <saurabhsingh9...@gmail.com> wrote: > Hi Ahmed, > > Yes, you're correct. Currently, we're utilizing the "record emitter" to > send messages into the queue for deletion. However, for the actual deletion > process, which is dependent on the checkpoints, we've been using the source > reader class because it allows us to override the notifyCheckpointComplete > method. > > Regards > Saurabh & Abhi > > On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote: > >> Hi Saurabh >> Thanks for addressing, I see the FLIP is in much better state. >> Could we specify where we queue messages for deletion, In my opinion the >> record emitter is a good place for that where we delete messages that are >> forwarded to the next operator. >> Other than that I don't have further comments. >> Thanks again for the effort. >> >> Best Regards >> Ahmed Hamdy >> >> >> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <saurabhsingh9...@gmail.com> >> wrote: >> >>> Hi Ahmed, >>> >>> Thank you very much for the detailed, valuable review. Please find our >>> responses below: >>> >>> >>> - In the FLIP you mention the split is going to be 1 sqs Queue, does >>> this mean we would support reading from multiple queues? This is also not >>> clear in the implementation of `addSplitsBack` whether we are planning to >>> support multiple sqs topics or not. >>> >>> *Our current implementation assumes that each source reads from a single >>> SQS queue. If you need to read from multiple SQS queues, you can define >>> multiple sources accordingly. We believe this approach is clearer and more >>> organized compared to having a single source switch between multiple >>> queues. This design choice is based on weighing the benefits, but we can >>> support multiple queues per source if the need arises.* >>> >>> - Regarding Client creation, there has been some effort in the >>> common `aws-util` module like createAwsSyncClient, we should reuse that >>> for >>> `SqsClient` creation. >>> >>> *Thank you for bringing this to our attention. Yes, we will utilize >>> the existing createClient methods available in the libraries. Our goal is >>> to avoid any code duplication on our end.* >>> >>> >>> - On the same point for clients, Is there a reason the FLIP suggests >>> async clients? sync clients have proven more stable and the source >>> threading model already guarantees no blocking by sync clients. >>> >>> *We were not aware of this, and we have been using async clients for our >>> in-house use cases. However, since we already have sync clients in the >>> aws-util that ensure no blocking, we are in a good position. We will use >>> these sync clients during our development and testing efforts, and we will >>> share the results and keep the community updated.* >>> >>> - On mentioning threading, the FLIP doesn’t mention the fetcher >>> manager. Is it going to be `SingleThreadFetcherManager`? Would it be >>> better >>> to make the source reader extend the SingleThreadedMultiplexReaderBase or >>> are we going to implement a more simple version? >>> >>> *Yes, we are considering implementing >>> SingleThreadMultiplexSourceReaderBase for the Reader. We have included the >>> implementation snippet in the FLIP for reference.* >>> >>> - The FLIP doesn’t mention schema deserialization or the >>> recordEmitter implementation, Are we going to use `deserializationSchema` >>> or some sort of string to element converter? It is also not clear form >>> the >>> builder example provided? >>> >>> *Yes, we plan to use the deserializationSchema and recordEmitter >>> implementations. We have included sample code for these in the FLIP for >>> reference.* >>> >>> - Are the values mentioned in getSqsClientProperties recommended >>> defaults? If so we should highlight that. >>> >>> *The defaults are not decided. These are just sample snapshots for >>> example.* >>> >>> - Most importantly I am a bit skeptical regarding enforcing >>> exactly-once semantics with side effects especially with dependency on >>> checkpointing configuration, could we add flags to disable and disable by >>> default if the checkpointing is not enabled? >>> >>> *During our initial design phase, we intended to enforce exactly-once >>> semantics via checkpoints. However, you raise a valid point, and we will >>> make this a configurable feature for users. They can choose to disable >>> exactly-once semantics, accepting some duplicate processing (at-least-once) >>> as a trade-off. We have updated the FLIP to include support for this >>> feature.* >>> >>> - I am not 100% convinced we should block FLIP itself on the >>> FLIP-438 implementation but I echo the fact that there might be some >>> reusable code between the 2 submodules we should make use of. >>> >>> *Yes we echo the same. We fully understand the concern and are >>> closely examining the SQS Sink implementation. We will ensure there is no >>> duplication of work or submodules. If any issues arise, we will address >>> them promptly.* >>> >>> >>> Thank you for your valuable feedback on the FLIP. Your input is helping >>> us refine and improve it significantly. >>> >>> [Main Proposal doc] - >>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl >>> >>> Regards >>> Saurabh & Abhi >>> >>> >>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <hamdy10...@gmail.com> >>> wrote: >>> >>>> Hi Saurabh >>>> I think this is going to be a valuable addition which is needed. >>>> I have a couple of comments >>>> - In the FLIP you mention the split is going to be 1 sqs Queue, does >>>> this >>>> mean we would support reading from multiple queues? The builder example >>>> seems to support a single queue >>>> SqsSource.<T>builder.setSqsUrl(" >>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";) >>>> - This is also not clear in the implementation of `addSplitsBack` >>>> whether >>>> we are planning to support multiple sqs topics or not. >>>> - Regarding Client creation, there has been some effort in the common >>>> `aws-util` module like createAwsSyncClient , we should reuse that for >>>> `SqsClient` creation. >>>> - On the same point for clients, Is there a reason the FLIP suggests >>>> async >>>> clients? sync clients have proven more stable and the source threading >>>> model already guarantees no blocking by sync clients. >>>> - On mentioning threading, the FLIP doesn't mention the fetcher >>>> manager. Is >>>> it going to be `SingleThreadFetcherManager`? Would it be better to make >>>> the >>>> source reader extend the SingleThreadedMultiplexReaderBase or are we >>>> going >>>> to implement a more simple version? >>>> - The FLIP doesn't mention schema deserialization or the recordEmitter >>>> implementation, Are we going to use `deserializationSchema` or some >>>> sort of >>>> string to element converter? It is also not clear form the builder >>>> example >>>> provided? >>>> - Are the values mentioned in getSqsClientProperties recommended >>>> defaults? >>>> If so we should highlight that >>>> - Most importantly I am a bit skeptical regarding enforcing exactly-once >>>> semantics with side effects especially with dependency on checkpointing >>>> configuration, could we add flags to disable and disable by default if >>>> the >>>> checkpointing is not enabled? >>>> >>>> I am not 100% convinced we should block FLIP itself on the FLIP-438 >>>> implementation but I echo the fact that there might be some reusable >>>> code >>>> between the 2 submodules we should make use of. >>>> >>>> Best Regards >>>> Ahmed Hamdy >>>> >>>> >>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <saurabhsingh9...@gmail.com >>>> > >>>> wrote: >>>> >>>> > Hi Li Wang, >>>> > >>>> > Thanks for the review and appreciate your feedback. >>>> > I completely understand your concern and agree with it. Our goal is to >>>> > provide users of connectors with a consistent and coherent ecosystem, >>>> free >>>> > of any issues. >>>> > To ensure that, we are closely monitoring/reviewing the SQS Sink >>>> > implementation work by Dhingra. Our development work will commence >>>> once the >>>> > AWS Sink is near completion or completed. This approach ensures that >>>> we >>>> > take in the new learnings, do not duplicate any core modules and >>>> allow us >>>> > to save valuable time. >>>> > >>>> > In the meantime, we would like to keep the discussion and process >>>> active on >>>> > this FLIP. Gaining valuable community feedback (which is helping us) >>>> will >>>> > help us address any potential gaps in the source connector design and >>>> > finalize it. Behind the scenes, we are already designing and >>>> pre-planning >>>> > our development work to adhere to feedback/best practices/ faster >>>> delivery >>>> > when we implement this FLIP. >>>> > Please share your thoughts on this. >>>> > >>>> > Regards >>>> > Saurabh >>>> > >>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <liwang505...@gmail.com> >>>> wrote: >>>> > >>>> > > Hi Saurabh, >>>> > > >>>> > > Thanks for the FLIP. Given the ongoing effort to implement the SQS >>>> sink >>>> > > connector ( >>>> > > >>>> > > >>>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector >>>> > > ), >>>> > > it is important to consider how the SQS source connector supports >>>> this >>>> > > development, ensuring a unified framework for AWS integration that >>>> avoids >>>> > > capacity overlap and maximizes synchronization. Since the >>>> implementation >>>> > by >>>> > > Dhingra is still WIP, I would suggest taking that into account and >>>> keep >>>> > > this FLIP as an extension of that FLIP which could be taken up once >>>> the >>>> > SQS >>>> > > Sink FLIP is fully implemented. If not, this may lead to doublework >>>> in >>>> > > multiple identity-related modules and structure of the overall SQS >>>> > > connector codebase. What do you think? >>>> > > >>>> > > Thanks >>>> > > >>>> > > >>>> > > On Thursday, July 25, 2024, Saurabh Singh < >>>> saurabhsingh9...@gmail.com> >>>> > > wrote: >>>> > > >>>> > > > Hi Samrat, >>>> > > > >>>> > > > Thanks for the review and feedback. >>>> > > > We have evaluated all the three points. Please find the answers >>>> below: >>>> > > > >>>> > > > 1. AWS has announced JSON protocol support in SQS [1]. Can you >>>> shed >>>> > some >>>> > > > light on how different protocols will be supported? >>>> > > > - We will utilize the AWS client library to connect with the AWS >>>> SQS >>>> > > > Service. Versions beyond 2.21.19 now support JSON, so simply >>>> upgrading >>>> > > the >>>> > > > client library will suffice for the protocol switch. However, >>>> from the >>>> > > > connector's perspective, we do not anticipate any changes in our >>>> > > > communication process, as it is handled by the client library. [4] >>>> > > > >>>> > > > 2. AWS SQS has two types of queues [2]. What are the >>>> implementation >>>> > > detail >>>> > > > differences for the source connector? >>>> > > > - SQS Connector is indifferent to the customer's choice of Queue >>>> type. >>>> > If >>>> > > > out-of-order messages are a concern, it will be the >>>> responsibility of >>>> > the >>>> > > > application code or main job logic to manage this. >>>> > > > >>>> > > > 3. Will the SQS source connector implement any kind of callbacks >>>> [3] on >>>> > > > success to offer any kind of guarantee? >>>> > > > - We have proposed deleting SQS messages using the notification >>>> > provided >>>> > > by >>>> > > > the checkpoint framework on checkpoint completion. Thus providing >>>> > exactly >>>> > > > once guarantee.[5] Additionally, when deleting messages, we will >>>> > monitor >>>> > > > the API call responses and log any failures, along with providing >>>> > > > observability through appropriate metrics. >>>> > > > >>>> > > > [1] >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/ >>>> > > > [2] >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html >>>> > > > [3] >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html >>>> > > > [4] >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started >>>> > > > [5] >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9 >>>> > > > >>>> > > > >>>> > > > [Main Proposal doc] - >>>> > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl >>>> > > > >>>> > > > Please feel free to reach out if you have more feedback. >>>> > > > >>>> > > > Regards >>>> > > > Saurabh & Abhi >>>> > > > >>>> > > > >>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <decordea...@gmail.com >>>> > >>>> > > wrote: >>>> > > > >>>> > > > > Hi Saurabh, >>>> > > > > >>>> > > > > Thank you for sharing the FLIP for the SQS source connector. An >>>> SQS >>>> > > > source >>>> > > > > connector will be a great addition to the Flink ecosystem, as >>>> there >>>> > is >>>> > > a >>>> > > > > growing demand for SQS source/sink integration. >>>> > > > > >>>> > > > > I have a few queries: >>>> > > > > >>>> > > > > 1. AWS has announced JSON protocol support in SQS [1]. Can you >>>> shed >>>> > > some >>>> > > > > light on how different protocols will be supported? >>>> > > > > 2. AWS SQS has two types of queues [2]. What are the >>>> implementation >>>> > > > detail >>>> > > > > differences for the source connector? >>>> > > > > 3. Will the SQS source connector implement any kind of >>>> callbacks [3] >>>> > on >>>> > > > > success to offer any kind of guarantee? >>>> > > > > >>>> > > > > >>>> > > > > >>>> > > > > [1] >>>> > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/ >>>> > > > > [2] >>>> > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html >>>> > > > > [3] >>>> > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html >>>> > > > > >>>> > > > > Bests, >>>> > > > > Samrat >>>> > > > > >>>> > > > > >>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh < >>>> > > > saurabhsingh9...@gmail.com> >>>> > > > > wrote: >>>> > > > > >>>> > > > > > Hi Fink Devs, >>>> > > > > > >>>> > > > > > Our team has been working on migrating various data pipelines >>>> to >>>> > > Flink >>>> > > > to >>>> > > > > > leverage the benefits of exactly-once processing, >>>> checkpointing, >>>> > and >>>> > > > > > stateful computing. We have several use cases built around >>>> the AWS >>>> > > SQS >>>> > > > > > Service. For this migration, we have developed an SQS Source >>>> > > Connector, >>>> > > > > > which enables us to run both stateless and stateful >>>> Flink-based >>>> > jobs. >>>> > > > > > >>>> > > > > > We believe that this SQS Source Connector would be a valuable >>>> > > addition >>>> > > > to >>>> > > > > > the existing connector set. Therefore, we propose a FLIP to >>>> include >>>> > > it. >>>> > > > > > >>>> > > > > > For more information, please refer to the FLIP document. >>>> > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing >>>> > > > > > >>>> > > > > > Thanks >>>> > > > > > Saurabh & Abhi >>>> > > > > > >>>> > > > > >>>> > > > >>>> > > >>>> > >>>> >>>