Hi Ahmed,

Yes, you're correct. Currently, we're utilizing the "record emitter" to
send messages into the queue for deletion. However, for the actual deletion
process, which is dependent on the checkpoints, we've been using the source
reader class because it allows us to override the notifyCheckpointComplete
method.

Regards
Saurabh & Abhi

On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <hamdy10...@gmail.com> wrote:

> Hi Saurabh
> Thanks for addressing, I see the FLIP is in much better state.
> Could we specify where we queue messages for deletion, In my opinion the
> record emitter is a good place for that where we delete messages that are
> forwarded to the next operator.
> Other than that I don't have further comments.
> Thanks again for the effort.
>
> Best Regards
> Ahmed Hamdy
>
>
> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <saurabhsingh9...@gmail.com>
> wrote:
>
>> Hi Ahmed,
>>
>> Thank you very much for the detailed, valuable review. Please find our
>> responses below:
>>
>>
>>    - In the FLIP you mention the split is going to be 1 sqs Queue, does
>>    this mean we would support reading from multiple queues? This is also not
>>    clear in the implementation of `addSplitsBack` whether we are planning to
>>    support multiple sqs topics or not.
>>
>> *Our current implementation assumes that each source reads from a single
>> SQS queue. If you need to read from multiple SQS queues, you can define
>> multiple sources accordingly. We believe this approach is clearer and more
>> organized compared to having a single source switch between multiple
>> queues. This design choice is based on weighing the benefits, but we can
>> support multiple queues per source if the need arises.*
>>
>>    - Regarding Client creation, there has been some effort in the common
>>    `aws-util` module like createAwsSyncClient, we should reuse that for
>>    `SqsClient` creation.
>>
>>    *Thank you for bringing this to our attention. Yes, we will utilize
>>    the existing createClient methods available in the libraries. Our goal is
>>    to avoid any code duplication on our end.*
>>
>>
>>    - On the same point for clients, Is there a reason the FLIP suggests
>>    async clients? sync clients have proven more stable and the source
>>    threading model already guarantees no blocking by sync clients.
>>
>> *We were not aware of this, and we have been using async clients for our
>> in-house use cases. However, since we already have sync clients in the
>> aws-util that ensure no blocking, we are in a good position. We will use
>> these sync clients during our development and testing efforts, and we will
>> share the results and keep the community updated.*
>>
>>    - On mentioning threading, the FLIP doesn’t mention the fetcher
>>    manager. Is it going to be `SingleThreadFetcherManager`? Would it be 
>> better
>>    to make the source reader extend the SingleThreadedMultiplexReaderBase or
>>    are we going to implement a more simple version?
>>
>> *Yes, we are considering implementing
>> SingleThreadMultiplexSourceReaderBase for the Reader. We have included the
>> implementation snippet in the FLIP for reference.*
>>
>>    - The FLIP doesn’t mention schema deserialization or the
>>    recordEmitter implementation, Are we going to use `deserializationSchema`
>>    or some sort of string to element converter? It is also not clear form the
>>    builder example provided?
>>
>> *Yes, we plan to use the deserializationSchema and recordEmitter
>> implementations. We have included sample code for these in the FLIP for
>> reference.*
>>
>>    - Are the values mentioned in getSqsClientProperties recommended
>>    defaults? If so we should highlight that.
>>
>> *The defaults are not decided. These are just sample snapshots for
>> example.*
>>
>>    - Most importantly I am a bit skeptical regarding enforcing
>>    exactly-once semantics with side effects especially with dependency on
>>    checkpointing configuration, could we add flags to disable and disable by
>>    default if the checkpointing is not enabled?
>>
>> *During our initial design phase, we intended to enforce exactly-once
>> semantics via checkpoints. However, you raise a valid point, and we will
>> make this a configurable feature for users. They can choose to disable
>> exactly-once semantics, accepting some duplicate processing (at-least-once)
>> as a trade-off. We have updated the FLIP to include support for this
>> feature.*
>>
>>    - I am not 100% convinced we should block FLIP itself on the FLIP-438
>>    implementation but I echo the fact that there might be some reusable code
>>    between the 2 submodules we should make use of.
>>
>>    *Yes we echo the same. We fully understand the concern and are
>>    closely examining the SQS Sink implementation. We will ensure there is no
>>    duplication of work or submodules. If any issues arise, we will address
>>    them promptly.*
>>
>>
>> Thank you for your valuable feedback on the FLIP. Your input is helping
>> us refine and improve it significantly.
>>
>> [Main Proposal doc] -
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>
>> Regards
>> Saurabh & Abhi
>>
>>
>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <hamdy10...@gmail.com> wrote:
>>
>>> Hi Saurabh
>>> I think this is going to be a valuable addition which is needed.
>>> I have a couple of comments
>>> - In the FLIP you mention the split is going to be 1 sqs Queue, does this
>>> mean we would support reading from multiple queues? The builder example
>>> seems to support a single queue
>>> SqsSource.<T>builder.setSqsUrl("
>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";)
>>> - This is also not clear in the implementation of `addSplitsBack` whether
>>> we are planning to support multiple sqs topics or not.
>>> - Regarding Client creation, there has been some effort in the common
>>> `aws-util` module like createAwsSyncClient  , we should reuse that for
>>> `SqsClient` creation.
>>> - On the same point for clients, Is there a reason the FLIP suggests
>>> async
>>> clients? sync clients have proven more stable and the source threading
>>> model already guarantees no blocking by sync clients.
>>> - On mentioning threading, the FLIP doesn't mention the fetcher manager.
>>> Is
>>> it going to be `SingleThreadFetcherManager`? Would it be better to make
>>> the
>>> source reader extend the SingleThreadedMultiplexReaderBase or are we
>>> going
>>> to implement a more simple version?
>>> - The FLIP doesn't mention schema deserialization or the recordEmitter
>>> implementation, Are we going to use `deserializationSchema` or some sort
>>> of
>>> string to element converter? It is also not clear form the builder
>>> example
>>> provided?
>>> - Are the values mentioned in getSqsClientProperties recommended
>>> defaults?
>>> If so we should highlight that
>>> - Most importantly I am a bit skeptical regarding enforcing exactly-once
>>> semantics with side effects especially with dependency on checkpointing
>>> configuration, could we add flags to disable and disable by default if
>>> the
>>> checkpointing is not enabled?
>>>
>>> I am not 100% convinced we should block FLIP itself on the FLIP-438
>>> implementation but I echo the fact that there might be some reusable code
>>> between the 2 submodules we should make use of.
>>>
>>>  Best Regards
>>> Ahmed Hamdy
>>>
>>>
>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <saurabhsingh9...@gmail.com>
>>> wrote:
>>>
>>> > Hi Li Wang,
>>> >
>>> > Thanks for the review and appreciate your feedback.
>>> > I completely understand your concern and agree with it. Our goal is to
>>> > provide users of connectors with a consistent and coherent ecosystem,
>>> free
>>> > of any issues.
>>> > To ensure that, we are closely monitoring/reviewing the SQS Sink
>>> > implementation work by Dhingra. Our development work will commence
>>> once the
>>> > AWS Sink is near completion or completed. This approach ensures that we
>>> > take in the new learnings, do not duplicate any core modules and allow
>>> us
>>> > to save valuable time.
>>> >
>>> > In the meantime, we would like to keep the discussion and process
>>> active on
>>> > this FLIP. Gaining valuable community feedback (which is helping us)
>>> will
>>> > help us address any potential gaps in the source connector design and
>>> > finalize it. Behind the scenes, we are already designing and
>>> pre-planning
>>> > our development work to adhere to feedback/best practices/ faster
>>> delivery
>>> > when we implement this FLIP.
>>> > Please share your thoughts on this.
>>> >
>>> > Regards
>>> > Saurabh
>>> >
>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <liwang505...@gmail.com>
>>> wrote:
>>> >
>>> > > Hi Saurabh,
>>> > >
>>> > > Thanks for the FLIP. Given the ongoing effort to implement the SQS
>>> sink
>>> > > connector (
>>> > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector
>>> > > ),
>>> > > it is important to consider how the SQS source connector supports
>>> this
>>> > > development, ensuring a unified framework for AWS integration that
>>> avoids
>>> > > capacity overlap and maximizes synchronization. Since the
>>> implementation
>>> > by
>>> > > Dhingra is still WIP, I would suggest taking that into account and
>>> keep
>>> > > this FLIP as an extension of that FLIP which could be taken up once
>>> the
>>> > SQS
>>> > > Sink FLIP is fully implemented. If not, this may lead to doublework
>>> in
>>> > > multiple identity-related modules and structure of the overall SQS
>>> > > connector codebase. What do you think?
>>> > >
>>> > > Thanks
>>> > >
>>> > >
>>> > > On Thursday, July 25, 2024, Saurabh Singh <
>>> saurabhsingh9...@gmail.com>
>>> > > wrote:
>>> > >
>>> > > > Hi Samrat,
>>> > > >
>>> > > > Thanks for the review and feedback.
>>> > > > We have evaluated all the three points. Please find the answers
>>> below:
>>> > > >
>>> > > > 1. AWS has announced JSON protocol support in SQS [1]. Can you shed
>>> > some
>>> > > > light on how different protocols will be supported?
>>> > > >  - We will utilize the AWS client library to connect with the AWS
>>> SQS
>>> > > > Service. Versions beyond 2.21.19 now support JSON, so simply
>>> upgrading
>>> > > the
>>> > > > client library will suffice for the protocol switch. However, from
>>> the
>>> > > > connector's perspective, we do not anticipate any changes in our
>>> > > > communication process, as it is handled by the client library. [4]
>>> > > >
>>> > > > 2. AWS SQS has two types of queues [2]. What are the implementation
>>> > > detail
>>> > > > differences for the source connector?
>>> > > > - SQS Connector is indifferent to the customer's choice of Queue
>>> type.
>>> > If
>>> > > > out-of-order messages are a concern, it will be the responsibility
>>> of
>>> > the
>>> > > > application code or main job logic to manage this.
>>> > > >
>>> > > > 3. Will the SQS source connector implement any kind of callbacks
>>> [3] on
>>> > > > success to offer any kind of guarantee?
>>> > > > - We have proposed deleting SQS messages using the notification
>>> > provided
>>> > > by
>>> > > > the checkpoint framework on checkpoint completion. Thus providing
>>> > exactly
>>> > > > once guarantee.[5] Additionally, when deleting messages, we will
>>> > monitor
>>> > > > the API call responses and log any failures, along with providing
>>> > > > observability through appropriate metrics.
>>> > > >
>>> > > > [1]
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>> > > > [2]
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>> > > > [3]
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>> > > > [4]
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started
>>> > > > [5]
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9
>>> > > >
>>> > > >
>>> > > > [Main Proposal doc] -
>>> > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>> > > >
>>> > > > Please feel free to reach out if you have more feedback.
>>> > > >
>>> > > > Regards
>>> > > > Saurabh & Abhi
>>> > > >
>>> > > >
>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <decordea...@gmail.com>
>>> > > wrote:
>>> > > >
>>> > > > > Hi Saurabh,
>>> > > > >
>>> > > > > Thank you for sharing the FLIP for the SQS source connector. An
>>> SQS
>>> > > > source
>>> > > > > connector will be a great addition to the Flink ecosystem, as
>>> there
>>> > is
>>> > > a
>>> > > > > growing demand for SQS source/sink integration.
>>> > > > >
>>> > > > > I have a few queries:
>>> > > > >
>>> > > > > 1. AWS has announced JSON protocol support in SQS [1]. Can you
>>> shed
>>> > > some
>>> > > > > light on how different protocols will be supported?
>>> > > > > 2. AWS SQS has two types of queues [2]. What are the
>>> implementation
>>> > > > detail
>>> > > > > differences for the source connector?
>>> > > > > 3. Will the SQS source connector implement any kind of callbacks
>>> [3]
>>> > on
>>> > > > > success to offer any kind of guarantee?
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > [1]
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>> > > > > [2]
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>> > > > > [3]
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>> > > > >
>>> > > > > Bests,
>>> > > > > Samrat
>>> > > > >
>>> > > > >
>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh <
>>> > > > saurabhsingh9...@gmail.com>
>>> > > > > wrote:
>>> > > > >
>>> > > > > > Hi Fink Devs,
>>> > > > > >
>>> > > > > > Our team has been working on migrating various data pipelines
>>> to
>>> > > Flink
>>> > > > to
>>> > > > > > leverage the benefits of exactly-once processing,
>>> checkpointing,
>>> > and
>>> > > > > > stateful computing. We have several use cases built around the
>>> AWS
>>> > > SQS
>>> > > > > > Service. For this migration, we have developed an SQS Source
>>> > > Connector,
>>> > > > > > which enables us to run both stateless and stateful Flink-based
>>> > jobs.
>>> > > > > >
>>> > > > > > We believe that this SQS Source Connector would be a valuable
>>> > > addition
>>> > > > to
>>> > > > > > the existing connector set. Therefore, we propose a FLIP to
>>> include
>>> > > it.
>>> > > > > >
>>> > > > > > For more information, please refer to the FLIP document.
>>> > > > > >
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>> > > > > >
>>> > > > > > Thanks
>>> > > > > > Saurabh & Abhi
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>

Reply via email to