I have updated the FLIP [1] (please double-check) and will add my +1 now to
the voting.

Best,

Arvid

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-477+Amazon+SQS+Source+Connector

On Wed, Sep 25, 2024 at 2:21 PM Saurabh Singh <saurabhsingh9...@gmail.com>
wrote:

> Hi Flink Devs,
>
> Thanks a lot for the valuable feedback from Arvid, Danny and David on the
> design of this SQS Connector.
>
> We had an in-depth discussion on Slack regarding the semantics and stages
> of implementation. Below is the *conclusion* of our conversation.
>
>    - Currently, FLIP will concentrate on capturing the connector details,
>    design, and implementation specifically for AWS SQS Standard Queues with
>    at-least once semantics. Support for FIFO queues will be introduced later
>    as an extension to this FLIP.
>    - *Evaluation of Exactly Once Semantics for Standard Queues*
>       - Unfortunately, achieving exactly once semantics is not feasible
>       for Standard Queues due to the following reasons:
>          - There can be duplicate message deliveries with SQS standard
>          queues.
>          - A successful delete call does not guarantee the absolute
>          deletion of the messages.
>
> Therefore, *we can only provide at-least-once delivery for Standard
> Queues.*
>
>    - *Evaluation of Exactly Once Semantics for FIFO Queues*
>       - Exactly once semantics is achievable for FIFO Queues. To
>       accomplish this, we need to keep track of seqNum (which is strictly
>       increasing for messages) and groupId. By using the seqNum, we can 
> achieve
>       exactly-once semantics.
>
>
>    - *Manipulation of Invisible Messages*
>       - *For Standard Queues*:
>          - Since we are providing at-least-once delivery, there is no
>          need to manipulate invisible messages. After their visibility timeout
>          expires, the messages are consumed again, and user code needs to 
> handle
>          these corner scenarios.
>       - *For FIFO Queues:*
>          - The challenge is that processing is halted if the previous
>          invisible messages are not deleted. Therefore, we need to 
> additionally keep
>          track of ReceiptHandle. In case messages remain in an invisible 
> state for
>          an extended period, we need to mark them visible again. This can be 
> quite
>          tricky, especially with multiple groupIds. The details and design of 
> this
>          mechanism will be an extension to this flip.
>
>
> Thanks a lot for the feedback. We have updated the FLIP.
> Updated Google Doc Link -
> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>
> Thanks
> Saurabh & Abhi
>
>
> -----------------------------------------------------------------------------------------
> *Slack Conversation Details **snapshot* *<Feel Free to Skip>**:  TLDR *
>
>
> Saurabh SinghSaurabh Singh  6:06 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726576584259939>
> Hi @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>Thank
> you for your response. We believe that it might be more efficient to
> continue this conversation via Slack DMs, and we can update the FLIP with
> our conclusions afterwards. I hope this works for you, but please feel free
> to let me know if you have any concerns.
> Regarding the discussion points:
>
>    1. *SQS Limitation*: SQS has a limitation where you cannot perform
>    operations on invisible messages if their receipt handles (the reference to
>    the messages) are not available.
>    2. *SQS Message IDs*: Message IDs are unique random strings.
>
> Now, we have two scenarios:
>
>    - *Messages become invisible but are not captured as part of the state*
>     (e.g., while reading): In this case, we can't do anything except wait
>    for them to become visible again, which is not ideal for FIFO queues.
>    - *Messages become invisible but are captured as part of the state* (e.g.,
>    failure to act on the NotifyCheckpointComplete call): Here, we would
>    merge all the message IDs read until the next completed checkpoint and then
>    delete them together.
>
> Are you planning to store the message ids of all to-be-deleted messages in
> the state? That certainly works for low-volume sources but ideally, you
> would just store the highest message id and enumerate the other message ids
> from it.
>
> Yes, we need to store all message IDs individually because, in Standard
> SQS queues, messages are delivered in an unordered manner.Initially, we
> considered addressing both Standard and FIFO queues. However, after
> discussions and further thought, it seems that the same approach may not be
> ideal for FIFO queues. We’d like to seek your advice on limiting the scope
> of this FLIP to focus exclusively on Standard queues for better
> implementation and clarity. We believe this could help streamline the
> process, and we could later have a follow-up discussion or extension to
> explore adjustments and optimizations for FIFO queues.
> What are your thoughts on this approach?
> Arvid HeiseArvid Heise  7:07 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580266021769>
> It's completely fine to focus on one type of queue and add the other later
> (even without FLIP). Of course, you should either fail on FIFO or document
> that this works in a very limited way.
> 7:09
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580384937009>
> Arvid Heise
> Storing all message IDs in the Flink state should be seen as a last
> resort. It's okay but depending on the number of messages may blow up state
> size significantly. It's much better than storing the payloads themselves
> (as we need to do for deduplication operator).
> Danny CranmerDanny Cranmer  7:10 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580422350019>
> was added to the conversation by Arvid Heise.
> Arvid HeiseArvid Heise  7:11 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580488752129>
> It usually pays off to toss a few ideas back and forth though: maybe it's
> enough to store the ID of the last committed message. If you have a way to
> distinguish replayed and not replayed message (timestamp?), then you could
> discard all replayed message until the ID pops up.
> 7:13
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580596696739>
> Arvid Heise
> What else can you do with receipt handles? Does it make sense to store
> them instead of the ID?
> 7:14
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580691227369>
> Arvid Heise
> Also probably a stupid Q: what happens with already read message when I
> set the timeout to 0? I'm assuming the respective timeout is attached to
> message on read and thus changing the timeout later has no effect on them...
> 7:15
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580751903719>
> Arvid Heise
> Lastly: you much contact do you have to the SQS folks? Could they add a
> way to interact with invisible messages later? Then, we could focus on a
> simple solution (possibly ignoring EOS) and add that later.
> Wednesday, 18 September
> Saurabh SinghSaurabh Singh  5:43 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726661587582269>
> Thank you for your feedback. Please find our responses below:
>
>    1. Regarding message handling, we are indeed storing the receipt
>    handles for the messages in the state. This is necessary for performing the
>    delete operation on the messages later. In our earlier communication, we
>    referred to them as message IDs for simplicity and ease of understanding.
>    2. For *Standard SQS queues*, due to the lack of ordering guarantees,
>    we must store each receipt handle to ensure we can delete the messages
>    appropriately. The message IDs alone are not sufficient for this purpose.
>    3. Anytime we set the visibility timeout of a message to 0 (whether
>    immediately after reading or later), the message becomes visible again,
>    leading to reprocessing and duplication. Fine-tuning the visibility timeout
>    is key to avoiding this duplication. (Ref Link
>    
> <https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0>
>    )
>    4. We are simply users of the AWS infrastructure, and do not have
>    direct interaction with the SQS development team. So it is not possible to
>    interact with them on this.
>
> Let us know your thoughts on this? (edited)
> [image: Amazon Web Services, Inc.]Amazon Web Services, Inc.
> SQS Visibility Timeout 0
> <https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0>
> I have an SQS queue with visibility timeout 0 to test how it works because
> I intend to use that setting for something that is outside of scope of this
> explanation.
> I have a Lambda function trigger...
> Thursday, 19 September
> Danny CranmerDanny Cranmer  1:43 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726733595413099>
> Disclaimer, I am not experienced with SQS.
>
> Regarding message handling, we are indeed storing the receipt handles for
> the messages in the state.
>
> Is this code available somewhere for us to look at, might help with the
> understanding.
>
> we are indeed storing the receipt handles for the messages in the state
>
> Are we also storing the actual messageID? We would likely need this in the
> case when we need to deduplicate a message with a new read receiptI note
> that standard queues only guarantee at-least-once delivery, so we cannot
> really provide exactly-once in Flink without more extensive deduplication
> logic
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html.
> Unless the delete operation returns an error that we can retry, do you know
> the behaviour in these conditions?Since we are storing the message ID
> (receipt handles) in the state it sounds like the happy path is solid,
> minus the above ^. For the "Messages become invisible but are not captured
> as part of the state" condition I suppose we are also covered, but we will
> introduce some latency increasing out-of-orderness.It sounds like we need
> to track the receiptId and messageId in state until we have successfully
> deleted in SQS. The receiptId is used to actually delete the message and
> the messageId is needed to dedupe. I assume we will be controlling the
> visibility period in the source? If so we can expose this to the user (with
> sensible defaults) to tune, ofc it should be something > checkpoint
> interval.
>
> We are simply users of the AWS infrastructure, and do not have direct
> interaction with the SQS development team. So it is not possible to
> interact with them on this.
>
> I know a few people that work at SQS, no promises but I have pinged them
> and might be able to get some feedback from them (edited)
> Arvid HeiseArvid Heise  2:37 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736852216579>
> I wonder if we get away with simply offering at least once and then expose
> the message ID as metadata and use Flink to dedupe.
> 2:37
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736863805459>
> Arvid Heise
> If I don't delete a message, are they stored indefintively?
> 2:40
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726737056514629>
> Arvid Heise
> Just to explain my line of thought: Storing per message data in the source
> blows up state. We can do that for low volume sources or if we have complex
> queries (e.g. joins which also stores per message data). If SQS is low
> volume anyways, we might simply replay everything and dedupe via message
> ID. That obviously will slow down recovery, so it's more likely to work if
> messages have some inherent retention period.
> Saurabh SinghSaurabh Singh  5:55 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726748737945309>
> Thank you for taking time and your feedback:
>
> Is this code available somewhere for us to look at? It might help with
> understanding.
>
> Currently, it is not available in any public domain. However, you can
> refer to the documentation of messageId and ReceiptHandle here
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
> .
>
> Are we also storing the actual messageID? We would likely need this in the
> case when we need to deduplicate a message with a new read receipt.
>
> At the moment, we are not storing the actual messageID. We only perform
> delete operations, which require only the receipt handle. While storing the
> messageID would indeed be useful for deduplication, it introduces the
> additional overhead of retaining the messageID until its retention period
> expires. Please note that as per the reference
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html>
>  even a successful delete operation is not 100% guaranteed, as the copy
> of the message might not be deleted on the server that is unavailable, and
> you might receive that message copy again.
>
> I assume we will be controlling the visibility period in the source? If
> so, we can expose this to the user (with sensible defaults) to tune, of
> course, it should be something > checkpoint interval.
>
> While we can manage this via APIs, it should be part of the infrastructure
> (SQS) provisioning and not something to be tuned on demand. We will provide
> guidelines to tune these parameters (Visibility Timeout/Retention Period)
> with respect to checkpoint duration/interval/timeout.
>
> I wonder if we get away with simply offering at least once.
>
> Yes, based on the details and discussion, this seems like a good starting
> point.
>
> Expose the message ID as metadata and use Flink to dedupe.
>
> Could you please expand on this? As we understand, we would need to
> maintain the messageID in a data structure until the retention time
> configured on the queue.
>
> If I don't delete a message, are they stored indefinitely?
>
> No, messages are not stored indefinitely. They expire or are deleted
> automatically after the message retention period of the queue.
> [image: docs.aws.amazon.com]docs.aws.amazon.com
> Amazon SQS queue and message identifiers - Amazon Simple Queue Service
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
> Learn about the unique identifiers assigned to messages in standard and
> FIFO queues, their formats, and how to effectively manage and track
> messages using these identifiers.
> [image: docs.aws.amazon.com]docs.aws.amazon.com
> Amazon SQS at-least-once delivery - Amazon Simple Queue Service
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html>
> Learn about the mechanisms and considerations involved in ensuring message
> delivery at least once.
> Danny CranmerDanny Cranmer  6:08 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749531070739>
>
> Is this code available somewhere for us to look at? It might help with
> understanding.
>
> Currently, it is not available in any public domain. However, you can
> refer to the documentation of messageId and ReceiptHandle here
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
> .
>
> I meant your connector code, assuming it exists
>
> Expose the message ID as metadata and use Flink to dedupe.
>
> Could you please expand on this? As we understand, we would need to
> maintain the messageID in a data structure until the retention time
> configured on the queue.
>
> We support at-least-once by default, but then provide a special
> deserialisation schema interface that gives access to the SQS message ID.
> Then a user can include this in their records for downstream
> deduplication.  Example here
> <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java#L64>
>  for Kinesis, we give access to seqNum etc (edited)
> 6:10
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749655044349>
> Danny Cranmer
>
> I assume we will be controlling the visibility period in the source? If
> so, we can expose this to the user (with sensible defaults) to tune, of
> course, it should be something > checkpoint interval.
>
> While we can manage this via APIs, it should be part of the infrastructure
> (SQS) provisioning and not something to be tuned on demand. We will provide
> guidelines to tune these parameters (Visibility Timeout/Retention Period)
> with respect to checkpoint duration/interval/timeout.
>
> ok, it is a shame that users will need to tune their queue to work with
> Flink, but sounds sensible given the limitations (edited)
> Saurabh SinghSaurabh Singh  6:43 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726751603344809>
> Thanks for the quick response and suggestions.
>
> I meant your connector code, assuming it exists.
>
> At the moment, we have a naive version implemented and used for our
> internal application. Unfortunately, sharing the code in its current form
> could have legal implications. Therefore, we have initiated this FLIP to
> develop this for the open-source community.
> [image: :+1:]1
> Friday, 20 September
> Danny CranmerDanny Cranmer  12:05 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770912823589>
> @David Green <https://apache-flink.slack.com/team/U07NKKNBG5P> has agreed
> to help us out. He is a Principal Engineer on the SQS team
> David GreenDavid Green  12:05 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770941081139>
> was added to the conversation by Danny Cranmer.
> Danny CranmerDanny Cranmer  12:11 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726771289630659>
> Hey @David Green <https://apache-flink.slack.com/team/U07NKKNBG5P>,
> thanks for jumping in [image: :wave:]We are trying to design an SQS
> source for Flink and achieve exactly once semantics. Flink has a built-in
> checkpointing mechanism, sources typically commit their current read
> position in the checkpoint and resume from there upon failure.For the SQS
> source we are trying to avoid storing all messages IDs and read receipts in
> the state to avoid huge state sizes. It would be helpful if you can review
> this thread and see if we have made any wrong assumptions, or would suggest
> any alternative solutions.
> David GreenDavid Green  12:46 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773418874009>
> Hi all, will read the thread to catch up. Is there a design doc or is it
> all in the chat?
> Danny CranmerDanny Cranmer  12:49 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773595097219>
> Here is the design, although it is probably lacking details of what we are
> discussing
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-477+Amazon+SQS+Source+Connector
> [image: :+1:]1
> David GreenDavid Green  1:07 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774654899819>
> Is there a definition of "exactly once"?
> https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/
> e.g. does exactly once relate to distributed state, or local state?
> [image: nightlies.apache.org]nightlies.apache.org
> Fault Tolerance Guarantees
> <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/>
> Fault Tolerance Guarantees of Data Sources and Sinks # Flink’s fault
> tolerance mechanism recovers programs in the presence of failures and
> continues to execute them. Such failures include machine hardware failures,
> network failures, transient program failures, etc.
> Flink can guarantee exactly-once state updates to user-defined state only
> when the source participates in the snapshotting mechanism. The following
> table lists the state update guarantees of Flink coupled with the bundled
> connectors.
> David GreenDavid Green  1:13 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774992609999>
> SQS standard queues provide at-least-once delivery. In rare cases,
> messages can be delivered more than once.
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html
> This potential for duplicate deliveries with SQS standard queues will make
> exactly once semantics impractical to implement in a FLINK source
> connector. Unlike SQS standard queues, SQS FIFO ordering means that you
> won't receive a duplicate delivery out of order.It might be useful to
> declare the capabilities of the SQS source connector separately for SQS
> FIFO and SQS standard queues.
> [image: docs.aws.amazon.com]docs.aws.amazon.com
> Amazon SQS standard queues - Amazon Simple Queue Service
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html>
> Learn about the basics of standard queues, including their features,
> usage, and best practices for integrating them into your applications as
> part of a messaging system.
> 1:18
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775287577599>
> David Green
> For SQS FIFO queues, you could consider using the `SequenceNumber`
> provided by [ReceiveMessage](
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
> )
> Tracking the `SequenceNumber` for each message group that is inflight
> might be a good way to provide exactly once semantics in the FLINK source
> connector.
> [image: docs.aws.amazon.com]docs.aws.amazon.com
> ReceiveMessage - Amazon Simple Queue Service
> <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html>
> Retrieves one or more messages (up to 10), from the specified queue. Using
> the WaitTimeSeconds parameter enables long-poll support. For more
> information, see Amazon SQS Long Polling in the Amazon SQS Developer Guide .
> Danny CranmerDanny Cranmer  1:21 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775472945699>
> Exactly once means that each record is consumed and processed within the
> Job only once. Even when there are failures and restarts. Flink has
> "in-flight" state and checkpointed state. Upon failure Flink restores from
> checkpointed state and discards "in-flight" state. Checkpoints are
> consistent across parallel tasks of the Flink job
> Danny CranmerDanny Cranmer  1:23 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775590547299>
> What is the sequence number? These docs are not that useful "Returns the
> value provided by Amazon SQS."
> 1 reply
> 5 days agoView thread
> David GreenDavid Green  1:23 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775611304669>
> Forgive my unfamiliarity with FLINK. What does that mean from a source
> connector perspective? If a FLINK application crashes and is restarted on
> another host, what is the expected behaviour?
> Danny CranmerDanny Cranmer  1:28 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775900380639>
> The Flink job would resume from the last checkpoint. The source should
> then resume from where it was at that checkpoint time/offset/etc. The idea
> for a queue is that upon checkpoint completion we delete all the messages
> consumed since the last checkpoint. However this cleanup is not guaranteed
> to happen, so we often store metadata in the state to help us cleanup
> eventually consistently and dedupe (edited)
> 1:32
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776120746739>
> Danny Cranmer
> So one idea is to collect the receipt IDs and then delete the messages in
> the checkpoint completion event. But since this might not happen we want to
> store state to eventually cleanup without storing every message ID. Usually
> we use some offset/sequence number.
> 1:33
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776221119169>
> Danny Cranmer
> Additionally on failure, we might need to reread messages that were
> in-flight, and we do not have the receipt IDs. But they may not be visible
> now, so besides tuning the visibility period is there a better way to
> optimise this?
> Danny CranmerDanny Cranmer  1:38 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776534516009>
> Sorry for the vague questions, it's difficult when I do not know SQS and
> you do not know Flink. Hopefully @Saurabh Singh
> <https://apache-flink.slack.com/team/U06GJSZTEUC> will have some more
> specific questions when online
> [image: :+1:]1
> David GreenDavid Green  1:59 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777789811769>
>
>  Additionally on failure, we might need to reread messages that were
> in-flight, and we do not have the receipt IDs. But they may not be visible
> now, so besides tuning the visibility period is there a better way to
> optimise this?
>
> Unfortunately no - if a message was received and is still within its
> invisibility timeout, there is no way to make it visible again without
> using the ReceiptHandle.
> 2:01
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777889954539>
> David Green
>
>  So one idea is to collect the receipt IDs and then delete the messages in
> the checkpoint completion event. But since this might not happen we want to
> store state to eventually cleanup without storing every message ID. Usually
> we use some offset/sequence number.
>
> This approach could work for SQS FIFO queues if you stored the
> SequenceNumber by message group ID, but each message group with in-flight
> messages would have to wait for those messages to become visible again.
> Yesterday
> Saurabh SinghSaurabh Singh  6:04 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727094880363379>
> Thanks @Danny Cranmer <https://apache-flink.slack.com/team/U03JD1ZFQ3D> @David
> Green <https://apache-flink.slack.com/team/U07NKKNBG5P> for the valuable
> insights. We have reviewed the conversation and are providing a summary
> here. We will update the FLIP accordingly once we agree on the same.
>
>    - *Exactly Once Semantics for Standard Queues*
>
>    - Unfortunately, achieving exactly once semantics is not feasible for
> Standard Queues due to the following reasons:
>        - There can be duplicate message deliveries with SQS standard
> queues.
>        - A successful delete call does not guarantee the absolute deletion
> of the messages.
>        *Therefore, we can only provide at-least-once delivery for
> Standard Queues.*• *Exactly Once Semantics for FIFO Queues*
>    - Exactly once semantics is achievable for FIFO Queues. To accomplish
> this, we need to keep track of seqNum (which is strictly increasing for
> messages) and groupId. By using the seqNum, we can achieve exactly once
> semantics.• *Manipulation of Invisible Messages*
>    - *For Standard Queues:*
>        - Since we are providing at-least-once delivery, there is no need
> to manipulate invisible messages. After their visibility timeout expires,
> the messages are consumed again, and user code needs to handle these corner
> scenarios.
>    -* For FIFO Queues:*
>        - The challenge is that processing is halted if the previous
> invisible messages are not deleted. Therefore, we need to additionally keep
> track of ReceiptHandle. In case messages remain in an invisible state for
> an extended period, we need to mark them visible again. This can be quite
> tricky, especially with multiple groupIds. I believe we need to study
> this in detail and design the mechanism in the extension flip.Let us know
> if this is good and we can proceed?
> ------------------------------
> New
> David GreenDavid Green  10:06 PM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727109383466049>
> LGTM
> Today
> Arvid HeiseArvid Heise  11:19 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727156995566719>
> Just to clarify:
>
>    - EOS with FIFO is out of scope of the FLIP now and there will be a
>    follow-up FLIP?
>
> 11:22
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157161772559>
> Arvid Heise
> Imho (and this view is not fully shared by all Flink committers) you don't
> strictly need FLIPs for connectors, especially if you just submit
> extensions. However, you can see how beneficial it is to discuss the ideas
> with a broader audience.
> Having a FLIP for the start of a connector is definitively something that
> I'd like to see but we would be driven mad if there is FLIP for each and
> every improvement. It's a different story if you change existing behavior.
> Saurabh SinghSaurabh Singh  11:33 AM
> <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157813737819>
> Agree @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>. We
> will just extend the existing FLIP to document the behaviour FIFO queues.
> This discussion has been very valuable for us to extend our knowledge and
> understanding of the concepts and concluding to the right behaviour for the
> connector. Thanks for helping us out here.
> We will update the FLIP and discussion thread with the new details.
>
>
>
>
>
> -------------------------------------------------------------------------------------------
>
>
>
>
> On Tue, Sep 17, 2024 at 12:26 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Saurabh,
>>
>> thank you very much for taking the time to explain SQS features to me.
>> It's a cool concept that I haven't seen before.
>>
>> While your presented approach already works for Flink EOS, I still need
>> to poke a bit because I think the usability may be improved.
>>
>> Some background information on how sinks usually work:
>> * On checkpoint, we store the committables (=transactions or file
>> locations).
>> * On notifyCheckpointComplete, we commit them all or fail on persisting
>> errors.
>> * On recovery, we retry all stored committables.
>> * On recovery, we cancel all opened transactions not belonging to the
>> recovered state.
>>
>> Now back to SQS, can we speed up recovery? Let's go over my thoughts:
>> * I'm assuming you need to set the timeout to a value that corresponds to
>> Flink checkpoint duration. So I often see 1 minute checkpointing interval
>> and a checkpoint taking some seconds, so we would choose a SQS timeout of 2
>> minutes (we'd probably set it higher in realistic settings).
>> * Before failure, Flink tries to read message1 and fails, there are more
>> messages afterwards (message2, ...)
>> * In standard queues, Flink restarts and will not see message1 until the
>> timeout happens. It can immediately start processing message2 and so on.
>> After 2 minutes, it will finally see message1 and consume it.
>> * In FIFO queues, Flink will not see any message until the timeout
>> happens. So effectively, Flink will not process anything for 2 minutes.
>> That means that recovery is effectively always as high as the SQS timeout.
>>
>> I'm curious if there is a way to manually timeout the messages for faster
>> recovery? I saw that you can decrease the timeout of specific messages,
>> which could be used to set the timeout to 0 on restart of message1. If that
>> works, we could do similar things to the sink. On restart, we immediately
>> try to delete the messages that are part of the checkpoint and set the
>> timeout to 0 for messages that were read after the checkpoint. If it's
>> somehow possible to enumerate all messages before and past the respective
>> checkpoint, we would not depend on the SQS timeout of the consumer for
>> correctness and recovery time and could set it to its max=12h. Else we
>> would have the users to trade off correctness (timeout too low) vs recovery
>> time (timeout too high).
>>
>> What are you planning to store as the source state?
>> > *record2 is still captured as part of the state, so it gets deleted
>> from the queue during the next checkpoint.*
>> Are you planning to store the message ids of all to-be-deleted messages
>> in the state? That certainly works for low volume sources but ideally, you
>> would just store the highest message id and enumerate the other message ids
>> from it.
>>
>> Could you please update the FLIP to include the provided information so
>> that other readers can quickly understand the visibility timeout and the
>> state management? (external links are fine of course but please provide a
>> brief summary in case the link becomes dead at some point) You can of
>> course defer it until we are completely aligned.
>>
>> Best,
>>
>> Arvid
>>
>> On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh <saurabhsingh9...@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks a lot for your review.
>>>
>>>    1. Exactly once:
>>>
>>>
>>>
>>> *When using an SQS source, we leverage two key properties of SQS:1.
>>> Message Retention: This property ensures that messages are retained in the
>>> queue for a specified period, allowing consumers enough time to process
>>> them. Post these messages are deleted.2. Visibility Timeout [1] : This
>>> property prevents other consumers from receiving and processing the same
>>> message while it's being processed by the current consumer.*
>>>
>>> *Regarding the scenario described below:*
>>>
>>> * Read record2, emit record2, record2 is written into a Kafka
>>> transaction2
>>> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
>>> to be committed
>>> * NotifyCheckpointCompleted2 is lost for some reason or happens after
>>> next
>>> failure
>>> * Some failure happens, Flink is restarted with the current system state:
>>> SQS contains record2, record3, Kafka contains record1 and pending
>>> record2.
>>> On restart, sink will commit transactions2 recovered from the state. No
>>> SQS
>>> contains record2, record3 and Kafka contains record1, record2.
>>> * Read record2, emit record2, record2 is written into a Kafka
>>> transaction3
>>> * Eventually, we end up with two record2 in the sink.
>>>
>>> *In this scenario, we set the Message Visibility Timeout to be multiples
>>> of the checkpoint duration and ensure it is longer than the recovery
>>> duration. This way, when recovery occurs, record2 remains invisible and is
>>> not read again, preventing duplication. record2 is still captured as part
>>> of the state, so it gets deleted from the queue during the next checkpoint.*
>>>
>>> *Message Retention Time: We typically set this to a sufficiently long
>>> duration. This is beneficial for recovering from extended outages and
>>> backfilling data. If a message is not deleted, it will be read again once
>>> it becomes visible after the visibility timeout expires.*
>>>
>>> *This is our current understanding for achieving exactly-once
>>> processing. Do you see any challenges with this approach? If so, we would
>>> greatly appreciate your feedback.*
>>>
>>> 2. Deleting messages:
>>> *Yes, It's quite natural for consumers to delete messages for the
>>> reasons mentioned above, such as recovery from outages and backfilling
>>> data.*
>>>
>>> 3. Regarding the Source parallelism:
>>> *ACK. We'll document this behavior accordingly.*
>>>
>>> 4. Multiple queue support:
>>> *Yes, as an extension to this FLIP, we will support this functionality
>>> in the future. However, in the meantime, this can still be achieved by
>>> creating multiple sources with specific queues.*
>>>
>>>
>>> Please review and let us know your feedback on this.
>>>
>>> [1]
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
>>>
>>>
>>> Thanks
>>> Saurabh & Abhi
>>>
>>>
>>> On Thu, Sep 12, 2024 at 2:23 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> <I accidentally sent this message Saurabh directly instead to the
>>>> mailing
>>>> list, please disregard that first message and only respond to this one>
>>>>
>>>> Hi Saurabh,
>>>>
>>>> thank you for addressing the points. I still have concerns around EOS
>>>> and a
>>>> few remarks.
>>>>
>>>> 1. Exactly once:
>>>> The checkpoint subsuming contract typically only applies to sinks, so
>>>> it's
>>>> unusual to be used in sources. Let's quickly look at an example so we
>>>> are
>>>> all on the same page.
>>>>
>>>> Assume we read messages record1, record2, record3 all arriving a few
>>>> minutes apart such there are checkpoints in-between. For simplicity
>>>> let's
>>>> assume one checkpoint between records and call them checkpoint1 after 1,
>>>> checkpoint2 after 2 and so on. The data is simply stored in some EOS
>>>> sink
>>>> (e.g. Kafka). The corresponding calls of NotifyCheckpointCompleted are
>>>> enumerated with NotifyCheckpointCompleted1, NotifyCheckpointCompleted2,
>>>> ...
>>>>
>>>> Then your source sees the following on failure:
>>>> * Read record1, emit record1, record1 is written into a Kafka
>>>> transaction1
>>>> * Checkpoint1 happens, state is checkpointed, sink remembers
>>>> transaction1
>>>> to be committed
>>>> * NotifyCheckpointCompleted1 happens, sink commits transaction1, so
>>>> record1
>>>> is visible to downstream consumer
>>>> * During NotifyCheckpointCompleted1, SQS source also deletes record1
>>>> * Some failure happens, Flink is restarted with the current system
>>>> state:
>>>> SQS contains record2, record3, Kafka contains record1. Since
>>>> transaction1
>>>> is already committed, the sink ignores it. So far so good.
>>>>
>>>> * Read record2, emit record2, record2 is written into a Kafka
>>>> transaction2
>>>> * Checkpoint2 happens, state is checkpointed, sink remembers
>>>> transaction2
>>>> to be committed
>>>> * NotifyCheckpointCompleted2 is lost for some reason or happens after
>>>> next
>>>> failure
>>>> * Some failure happens, Flink is restarted with the current system
>>>> state:
>>>> SQS contains record2, record3, Kafka contains record1 and pending
>>>> record2.
>>>> On restart, sink will commit transactions2 recovered from the state. No
>>>> SQS
>>>> contains record2, record3 and Kafka contains record1, record2.
>>>> * Read record2, emit record2, record2 is written into a Kafka
>>>> transaction3
>>>> * Eventually, we end up with two record2 in the sink.
>>>>
>>>> So you see that with your proposal, we get duplicate records easily on
>>>> failure, which is not exactly once. It's not enough to delete the
>>>> messages
>>>> eventually with NotifyCheckpointCompleted3.
>>>>
>>>> How it usually works:
>>>> * The checkpoint state of the source contains some kind of offset, which
>>>> would point to record3 in our example.
>>>> * On recovery, seek the record corresponding to the offset and start
>>>> reading from there.
>>>>
>>>> If the source system doesn't support seeking, then the usual way is to
>>>> replay a larger section but discard all records with a smaller offset.
>>>> If
>>>> there is no strict ordering, a last resort is to track all message ids
>>>> on
>>>> low volume sources and discard duplicates. You could also start with
>>>> just
>>>> offering AT LEAST ONCE and let the user deduplicate. We should just be
>>>> wary
>>>> to not call something exactly once that isn't.
>>>>
>>>> 2. Deleting messages:
>>>> It's rather uncommon for a connector to explicitly delete messages.
>>>> Usually, they fall off retention at some point. I'd try to decouple that
>>>> from EOS and give users an option to fully consume or not. Unless of
>>>> course, it's very natural for SQS consumers to delete the messages; I
>>>> can't
>>>> assess that as I have never used SQS.
>>>>
>>>> 3. Regarding the parallelism:
>>>> > *We have reviewed the DynamicParallelismInference and noticed that it
>>>> is
>>>> currently limited to batching jobs only [2]. *
>>>> You are correct it cannot be used and thus we cannot enforce
>>>> parallelism of
>>>> 1. I got excited when I saw that interface and didn't read properly.
>>>>
>>>> Limiting the parallelism will help to reduce the resources. If you spawn
>>>> more than 1 source subtask, it should get immediately terminated because
>>>> there is nothing to do (don't forget to emit SourceReaderFinishedEvent
>>>> for
>>>> all idle subtasks). If the pipeline is embarrassing parallel, all
>>>> downstream subtasks will also be closed.
>>>> But there is nothing that you can do except document it to set the
>>>> parallelism of the source to 1. A higher parallelism is only usable
>>>> with a
>>>> shuffle then.
>>>>
>>>> Alternatively, you could think about allowing the source to subscribe to
>>>> multiple queues and distribute them to the different subtasks.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Tue, Sep 10, 2024 at 4:50 PM Saurabh Singh <
>>>> saurabhsingh9...@gmail.com>
>>>> wrote:
>>>>
>>>> > Hi Danny, Arvid,
>>>> >
>>>> > Thank you so much for reviewing the FLIP. We apologize for the delayed
>>>> > response; we've been quite busy over the past week.
>>>> >
>>>> >
>>>> >    - // .setFailOnError(false)
>>>> >    A general callout that this is not ideal, I know we use it
>>>> elsewhere
>>>> >    but we should consider some pluggable error handling at some
>>>> point. Not as
>>>> >    part of this FLIP, but if we can avoid adding this flag here, then
>>>> I would.
>>>> >
>>>> > *Acknowledged. We will remove this flag and address it separately to
>>>> > enhance our error handling design.*
>>>> >
>>>> >    - // The code registers a checkpoint completion notification
>>>> callback
>>>> >    via *notifyCheckpointComplete*
>>>> >    Flink does not guarantee that notifications are actually invoked
>>>> and
>>>> >    successful [1]. Therefore there is a chance this method will not
>>>> run, and
>>>> >    hence we could violate exactly once semantics here. Suggest that we
>>>> >    instead/additionally track the SQS read messages within the
>>>> checkpoint
>>>> >    state, we can evict from state once deleted, and possibly prune on
>>>> startup
>>>> >    or skip duplicates. Thoughts?
>>>> >
>>>> > *Yes, our implementation will comply with the checkpoint subsuming
>>>> > contract mentioned in [1]. To achieve this, we will track newly read
>>>> > messages <message ids> in a structure associated with a checkpoint
>>>> ID. In
>>>> > cases where checkpoint notifications are missed, we will review all
>>>> > messages in the state with checkpoint IDs less than the received one,
>>>> > delete those messages, and remove them from the state.*
>>>> >
>>>> > *In addition, for optimal functionality, we depend on the visibility
>>>> > timeout configured for the queue. If read messages are not deleted
>>>> before
>>>> > the visibility timeout expires due to any transient issues, they will
>>>> > reappear. We will offer guidance to users of this connector on
>>>> adjusting
>>>> > the visibility timeout value to align with specific checkpoint
>>>> durations
>>>> > and avoid duplicate processing.*
>>>> >
>>>> >    - Since we only have 1 split, we should also limit the parallelism
>>>> of
>>>> >    the source accordingly. We could exploit the
>>>> DynamicParallelismInference to
>>>> >    effectively limit it to 1 unless the user explicitly overwrites.
>>>> >
>>>> > *We have reviewed the DynamicParallelismInference and noticed that it
>>>> is
>>>> > currently limited to batching jobs only [2]. We would appreciate it
>>>> if you
>>>> > could help us understand the rationale behind restricting the
>>>> parallelism
>>>> > for the source to 1. Are there any benefits to this limitation, or
>>>> > potential issues if we do not enforce it?*
>>>> >
>>>> >    - If I haven't overlooked something obvious, then your exactly-once
>>>> >    strategy will not work. There is unfortunately no guarantee that
>>>> >    notifyCheckpointComplete is called at all before a failure
>>>> happens. So
>>>> >    you very likely get duplicate messages. Scanning the SQS
>>>> >    documentation, I saw that you can read the SequenceNumber of the
>>>> message.
>>>> >    If you also store the latest number in the checkpoint state of the
>>>> split,
>>>> >    then you can discard all messages during recovery that are
>>>> smaller. But I
>>>> >    have never used SQS, so just take it as an inspiration. Also note
>>>> that you
>>>> >    didn't sketch how you delete messages from the queue. It's very
>>>> important
>>>> >    to only delete those messages that are part of the successful
>>>> checkpoint.
>>>> >    So you can't use PurgeQueue.
>>>> >
>>>> > *As mentioned earlier in the second point, we have already discussed
>>>> the
>>>> > mechanism. Please let us know if you have any concerns.*
>>>> >
>>>> >    - I wonder if you need to use ReceiveRequestAttemptId. It looks
>>>> like
>>>> >    it may be important for retries.
>>>> >
>>>> >
>>>> >
>>>> > *Since we use the AWS SDK library to establish connections and
>>>> perform SQS
>>>> > operations. This library already ensures retries for any transient
>>>> issues,
>>>> > with a default of three retries [3]. Additionally, AWS provides
>>>> predefined
>>>> > schemes to select from when creating the client.*
>>>> > Please review and let us know your feedback on this.
>>>> >
>>>> >
>>>> > *[1] *
>>>> >
>>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html
>>>> > *[2] *
>>>> >
>>>> https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java#L29
>>>> > *[3]*
>>>> >
>>>> https://github.com/aws/aws-sdk-java/blob/61d73631fac8535ad70666bbce9e70a1d2cea2ca/aws-java-sdk-core/src/main/java/com/amazonaws/retry/PredefinedRetryPolicies.java#L41
>>>> >
>>>> >
>>>> >
>>>> > Regards
>>>> > Saurabh & Abhi
>>>> >
>>>> >
>>>> >
>>>> > On Tue, Sep 3, 2024 at 6:40 PM Arvid Heise <ar...@apache.org> wrote:
>>>> >
>>>> >> Sorry for being late to the party. I saw your call to vote and
>>>> looked at
>>>> >> the FLIP.
>>>> >>
>>>> >> First, most of the design is looking really good and it will be good
>>>> to
>>>> >> have another connector integrated into the AWS ecosystem. A couple of
>>>> >> questions/remarks:
>>>> >> 1) Since we only have 1 split, we should also limit the parallelism
>>>> of the
>>>> >> source accordingly. We could exploit the DynamicParallelismInference
>>>> to
>>>> >> effectively limit it to 1 unless the user explicitly overwrites.
>>>> >> 2) If I haven't overlooked something obvious, then your exactly-once
>>>> >> strategy will not work. There is unfortunately no guarantee
>>>> >> that notifyCheckpointComplete is called at all before a failure
>>>> happens.
>>>> >> So
>>>> >> you very likely get duplicate messages.
>>>> >> Scanning the SQS documentation, I saw that you can read the
>>>> SequenceNumber
>>>> >> of the message. If you also store the latest number in the checkpoint
>>>> >> state
>>>> >> of the split, then you can discard all messages during recovery that
>>>> are
>>>> >> smaller. But I have never used SQS, so just take it as an
>>>> inspiration.
>>>> >> Also note that you didn't sketch how you delete messages from the
>>>> queue.
>>>> >> It's very important to only delete those messages that are part of
>>>> the
>>>> >> successful checkpoint. So you can't use PurgeQueue.
>>>> >> 3) I wonder if you need to use ReceiveRequestAttemptId. It looks
>>>> like it
>>>> >> may be important for retries.
>>>> >>
>>>> >> Best,
>>>> >>
>>>> >> Arvid
>>>> >>
>>>> >> On Tue, Aug 20, 2024 at 11:24 AM Ahmed Hamdy <hamdy10...@gmail.com>
>>>> >> wrote:
>>>> >>
>>>> >> > Hi Abhisagar and Saurabh
>>>> >> > I have created the FLIP page and assigned it FLIP-477[1]. Feel
>>>> free to
>>>> >> > resume with the next steps.
>>>> >> >
>>>> >> > 1-
>>>> >> >
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-+477+Amazon+SQS+Source+Connector
>>>> >> > Best Regards
>>>> >> > Ahmed Hamdy
>>>> >> >
>>>> >> >
>>>> >> > On Tue, 20 Aug 2024 at 06:05, Abhisagar Khatri <
>>>> >> > khatri.abhisaga...@gmail.com>
>>>> >> > wrote:
>>>> >> >
>>>> >> > > Hi Flink Devs,
>>>> >> > >
>>>> >> > > Gentle Reminder for the request. We'd like to ask the
>>>> PMC/Committers
>>>> >> to
>>>> >> > > transfer the content from the Amazon SQS Source Connector Google
>>>> Doc
>>>> >> [1]
>>>> >> > > and assign a FLIP Number for us, which we can use further for
>>>> voting.
>>>> >> > > We are following the procedure outlined on the Flink Improvement
>>>> >> Proposal
>>>> >> > > Confluence page [2].
>>>> >> > >
>>>> >> > > [1]
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>>> >> > > [2]
>>>> >> > >
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>>>> >> > >
>>>> >> > > Regards,
>>>> >> > > Abhi & Saurabh
>>>> >> > >
>>>> >> > >
>>>> >> > > On Tue, Aug 13, 2024 at 12:50 PM Saurabh Singh <
>>>> >> > saurabhsingh9...@gmail.com>
>>>> >> > > wrote:
>>>> >> > >
>>>> >> > >> Hi Flink Devs,
>>>> >> > >>
>>>> >> > >> Thanks for all the feedback flink devs.
>>>> >> > >>
>>>> >> > >> Following the procedure outlined on the Flink Improvement
>>>> Proposal
>>>> >> > >> Confluence page [1], we kindly ask the PMC/Committers to
>>>> transfer the
>>>> >> > >> content from the Amazon SQS Source Connector Google Doc [2] and
>>>> >> assign a
>>>> >> > >> FLIP Number for us, which we will use for voting.
>>>> >> > >>
>>>> >> > >> [1]
>>>> >> > >>
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>>>> >> > >> [2]
>>>> >> > >>
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>>> >> > >>
>>>> >> > >> Regards
>>>> >> > >> Saurabh & Abhi
>>>> >> > >>
>>>> >> > >>
>>>> >> > >> On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh <
>>>> >> > saurabhsingh9...@gmail.com>
>>>> >> > >> wrote:
>>>> >> > >>
>>>> >> > >>> Hi Ahmed,
>>>> >> > >>>
>>>> >> > >>> Yes, you're correct. Currently, we're utilizing the "record
>>>> >> emitter" to
>>>> >> > >>> send messages into the queue for deletion. However, for the
>>>> actual
>>>> >> > deletion
>>>> >> > >>> process, which is dependent on the checkpoints, we've been
>>>> using the
>>>> >> > source
>>>> >> > >>> reader class because it allows us to override the
>>>> >> > notifyCheckpointComplete
>>>> >> > >>> method.
>>>> >> > >>>
>>>> >> > >>> Regards
>>>> >> > >>> Saurabh & Abhi
>>>> >> > >>>
>>>> >> > >>> On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <
>>>> hamdy10...@gmail.com>
>>>> >> > wrote:
>>>> >> > >>>
>>>> >> > >>>> Hi Saurabh
>>>> >> > >>>> Thanks for addressing, I see the FLIP is in much better state.
>>>> >> > >>>> Could we specify where we queue messages for deletion, In my
>>>> >> opinion
>>>> >> > >>>> the record emitter is a good place for that where we delete
>>>> >> messages
>>>> >> > that
>>>> >> > >>>> are forwarded to the next operator.
>>>> >> > >>>> Other than that I don't have further comments.
>>>> >> > >>>> Thanks again for the effort.
>>>> >> > >>>>
>>>> >> > >>>> Best Regards
>>>> >> > >>>> Ahmed Hamdy
>>>> >> > >>>>
>>>> >> > >>>>
>>>> >> > >>>> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <
>>>> >> > saurabhsingh9...@gmail.com>
>>>> >> > >>>> wrote:
>>>> >> > >>>>
>>>> >> > >>>>> Hi Ahmed,
>>>> >> > >>>>>
>>>> >> > >>>>> Thank you very much for the detailed, valuable review.
>>>> Please find
>>>> >> > our
>>>> >> > >>>>> responses below:
>>>> >> > >>>>>
>>>> >> > >>>>>
>>>> >> > >>>>>    - In the FLIP you mention the split is going to be 1 sqs
>>>> Queue,
>>>> >> > >>>>>    does this mean we would support reading from multiple
>>>> queues?
>>>> >> > This is also
>>>> >> > >>>>>    not clear in the implementation of `addSplitsBack`
>>>> whether we
>>>> >> are
>>>> >> > planning
>>>> >> > >>>>>    to support multiple sqs topics or not.
>>>> >> > >>>>>
>>>> >> > >>>>> *Our current implementation assumes that each source reads
>>>> from a
>>>> >> > >>>>> single SQS queue. If you need to read from multiple SQS
>>>> queues,
>>>> >> you
>>>> >> > can
>>>> >> > >>>>> define multiple sources accordingly. We believe this
>>>> approach is
>>>> >> > clearer
>>>> >> > >>>>> and more organized compared to having a single source switch
>>>> >> between
>>>> >> > >>>>> multiple queues. This design choice is based on weighing the
>>>> >> > benefits, but
>>>> >> > >>>>> we can support multiple queues per source if the need
>>>> arises.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - Regarding Client creation, there has been some effort
>>>> in the
>>>> >> > >>>>>    common `aws-util` module like createAwsSyncClient, we
>>>> should
>>>> >> > reuse that for
>>>> >> > >>>>>    `SqsClient` creation.
>>>> >> > >>>>>
>>>> >> > >>>>>    *Thank you for bringing this to our attention. Yes, we
>>>> will
>>>> >> > >>>>>    utilize the existing createClient methods available in the
>>>> >> > libraries. Our
>>>> >> > >>>>>    goal is to avoid any code duplication on our end.*
>>>> >> > >>>>>
>>>> >> > >>>>>
>>>> >> > >>>>>    - On the same point for clients, Is there a reason the
>>>> FLIP
>>>> >> > >>>>>    suggests async clients? sync clients have proven more
>>>> stable
>>>> >> and
>>>> >> > the source
>>>> >> > >>>>>    threading model already guarantees no blocking by sync
>>>> clients.
>>>> >> > >>>>>
>>>> >> > >>>>> *We were not aware of this, and we have been using async
>>>> clients
>>>> >> for
>>>> >> > >>>>> our in-house use cases. However, since we already have sync
>>>> >> clients
>>>> >> > in the
>>>> >> > >>>>> aws-util that ensure no blocking, we are in a good position.
>>>> We
>>>> >> will
>>>> >> > use
>>>> >> > >>>>> these sync clients during our development and testing
>>>> efforts, and
>>>> >> > we will
>>>> >> > >>>>> share the results and keep the community updated.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - On mentioning threading, the FLIP doesn’t mention the
>>>> fetcher
>>>> >> > >>>>>    manager. Is it going to be `SingleThreadFetcherManager`?
>>>> Would
>>>> >> it
>>>> >> > be better
>>>> >> > >>>>>    to make the source reader extend the
>>>> >> > SingleThreadedMultiplexReaderBase or
>>>> >> > >>>>>    are we going to implement a more simple version?
>>>> >> > >>>>>
>>>> >> > >>>>> *Yes, we are considering implementing
>>>> >> > >>>>> SingleThreadMultiplexSourceReaderBase for the Reader. We have
>>>> >> > included the
>>>> >> > >>>>> implementation snippet in the FLIP for reference.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - The FLIP doesn’t mention schema deserialization or the
>>>> >> > >>>>>    recordEmitter implementation, Are we going to use
>>>> >> > `deserializationSchema`
>>>> >> > >>>>>    or some sort of string to element converter? It is also
>>>> not
>>>> >> clear
>>>> >> > form the
>>>> >> > >>>>>    builder example provided?
>>>> >> > >>>>>
>>>> >> > >>>>> *Yes, we plan to use the deserializationSchema and
>>>> recordEmitter
>>>> >> > >>>>> implementations. We have included sample code for these in
>>>> the
>>>> >> FLIP
>>>> >> > for
>>>> >> > >>>>> reference.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - Are the values mentioned in getSqsClientProperties
>>>> >> recommended
>>>> >> > >>>>>    defaults? If so we should highlight that.
>>>> >> > >>>>>
>>>> >> > >>>>> *The defaults are not decided. These are just sample
>>>> snapshots for
>>>> >> > >>>>> example.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - Most importantly I am a bit skeptical regarding
>>>> enforcing
>>>> >> > >>>>>    exactly-once semantics with side effects especially with
>>>> >> > dependency on
>>>> >> > >>>>>    checkpointing configuration, could we add flags to
>>>> disable and
>>>> >> > disable by
>>>> >> > >>>>>    default if the checkpointing is not enabled?
>>>> >> > >>>>>
>>>> >> > >>>>> *During our initial design phase, we intended to enforce
>>>> >> exactly-once
>>>> >> > >>>>> semantics via checkpoints. However, you raise a valid point,
>>>> and
>>>> >> we
>>>> >> > will
>>>> >> > >>>>> make this a configurable feature for users. They can choose
>>>> to
>>>> >> > disable
>>>> >> > >>>>> exactly-once semantics, accepting some duplicate processing
>>>> >> > (at-least-once)
>>>> >> > >>>>> as a trade-off. We have updated the FLIP to include support
>>>> for
>>>> >> this
>>>> >> > >>>>> feature.*
>>>> >> > >>>>>
>>>> >> > >>>>>    - I am not 100% convinced we should block FLIP itself on
>>>> the
>>>> >> > >>>>>    FLIP-438 implementation but I echo the fact that there
>>>> might be
>>>> >> > some
>>>> >> > >>>>>    reusable code between the 2 submodules we should make use
>>>> of.
>>>> >> > >>>>>
>>>> >> > >>>>>    *Yes we echo the same. We fully understand the concern
>>>> and are
>>>> >> > >>>>>    closely examining the SQS Sink implementation. We will
>>>> ensure
>>>> >> > there is no
>>>> >> > >>>>>    duplication of work or submodules. If any issues arise,
>>>> we will
>>>> >> > address
>>>> >> > >>>>>    them promptly.*
>>>> >> > >>>>>
>>>> >> > >>>>>
>>>> >> > >>>>> Thank you for your valuable feedback on the FLIP. Your input
>>>> is
>>>> >> > >>>>> helping us refine and improve it significantly.
>>>> >> > >>>>>
>>>> >> > >>>>> [Main Proposal doc] -
>>>> >> > >>>>>
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>>> >> > >>>>>
>>>> >> > >>>>> Regards
>>>> >> > >>>>> Saurabh & Abhi
>>>> >> > >>>>>
>>>> >> > >>>>>
>>>> >> > >>>>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <
>>>> hamdy10...@gmail.com
>>>> >> >
>>>> >> > >>>>> wrote:
>>>> >> > >>>>>
>>>> >> > >>>>>> Hi Saurabh
>>>> >> > >>>>>> I think this is going to be a valuable addition which is
>>>> needed.
>>>> >> > >>>>>> I have a couple of comments
>>>> >> > >>>>>> - In the FLIP you mention the split is going to be 1 sqs
>>>> Queue,
>>>> >> does
>>>> >> > >>>>>> this
>>>> >> > >>>>>> mean we would support reading from multiple queues? The
>>>> builder
>>>> >> > >>>>>> example
>>>> >> > >>>>>> seems to support a single queue
>>>> >> > >>>>>> SqsSource.<T>builder.setSqsUrl("
>>>> >> > >>>>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";)
>>>> >> > >>>>>> - This is also not clear in the implementation of
>>>> `addSplitsBack`
>>>> >> > >>>>>> whether
>>>> >> > >>>>>> we are planning to support multiple sqs topics or not.
>>>> >> > >>>>>> - Regarding Client creation, there has been some effort in
>>>> the
>>>> >> > common
>>>> >> > >>>>>> `aws-util` module like createAwsSyncClient  , we should
>>>> reuse
>>>> >> that
>>>> >> > for
>>>> >> > >>>>>> `SqsClient` creation.
>>>> >> > >>>>>> - On the same point for clients, Is there a reason the FLIP
>>>> >> suggests
>>>> >> > >>>>>> async
>>>> >> > >>>>>> clients? sync clients have proven more stable and the source
>>>> >> > threading
>>>> >> > >>>>>> model already guarantees no blocking by sync clients.
>>>> >> > >>>>>> - On mentioning threading, the FLIP doesn't mention the
>>>> fetcher
>>>> >> > >>>>>> manager. Is
>>>> >> > >>>>>> it going to be `SingleThreadFetcherManager`? Would it be
>>>> better
>>>> >> to
>>>> >> > >>>>>> make the
>>>> >> > >>>>>> source reader extend the SingleThreadedMultiplexReaderBase
>>>> or
>>>> >> are we
>>>> >> > >>>>>> going
>>>> >> > >>>>>> to implement a more simple version?
>>>> >> > >>>>>> - The FLIP doesn't mention schema deserialization or the
>>>> >> > recordEmitter
>>>> >> > >>>>>> implementation, Are we going to use `deserializationSchema`
>>>> or
>>>> >> some
>>>> >> > >>>>>> sort of
>>>> >> > >>>>>> string to element converter? It is also not clear form the
>>>> >> builder
>>>> >> > >>>>>> example
>>>> >> > >>>>>> provided?
>>>> >> > >>>>>> - Are the values mentioned in getSqsClientProperties
>>>> recommended
>>>> >> > >>>>>> defaults?
>>>> >> > >>>>>> If so we should highlight that
>>>> >> > >>>>>> - Most importantly I am a bit skeptical regarding enforcing
>>>> >> > >>>>>> exactly-once
>>>> >> > >>>>>> semantics with side effects especially with dependency on
>>>> >> > >>>>>> checkpointing
>>>> >> > >>>>>> configuration, could we add flags to disable and disable by
>>>> >> default
>>>> >> > >>>>>> if the
>>>> >> > >>>>>> checkpointing is not enabled?
>>>> >> > >>>>>>
>>>> >> > >>>>>> I am not 100% convinced we should block FLIP itself on the
>>>> >> FLIP-438
>>>> >> > >>>>>> implementation but I echo the fact that there might be some
>>>> >> reusable
>>>> >> > >>>>>> code
>>>> >> > >>>>>> between the 2 submodules we should make use of.
>>>> >> > >>>>>>
>>>> >> > >>>>>>  Best Regards
>>>> >> > >>>>>> Ahmed Hamdy
>>>> >> > >>>>>>
>>>> >> > >>>>>>
>>>> >> > >>>>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <
>>>> >> > >>>>>> saurabhsingh9...@gmail.com>
>>>> >> > >>>>>> wrote:
>>>> >> > >>>>>>
>>>> >> > >>>>>> > Hi Li Wang,
>>>> >> > >>>>>> >
>>>> >> > >>>>>> > Thanks for the review and appreciate your feedback.
>>>> >> > >>>>>> > I completely understand your concern and agree with it.
>>>> Our
>>>> >> goal
>>>> >> > is
>>>> >> > >>>>>> to
>>>> >> > >>>>>> > provide users of connectors with a consistent and coherent
>>>> >> > >>>>>> ecosystem, free
>>>> >> > >>>>>> > of any issues.
>>>> >> > >>>>>> > To ensure that, we are closely monitoring/reviewing the
>>>> SQS
>>>> >> Sink
>>>> >> > >>>>>> > implementation work by Dhingra. Our development work will
>>>> >> commence
>>>> >> > >>>>>> once the
>>>> >> > >>>>>> > AWS Sink is near completion or completed. This approach
>>>> ensures
>>>> >> > >>>>>> that we
>>>> >> > >>>>>> > take in the new learnings, do not duplicate any core
>>>> modules
>>>> >> and
>>>> >> > >>>>>> allow us
>>>> >> > >>>>>> > to save valuable time.
>>>> >> > >>>>>> >
>>>> >> > >>>>>> > In the meantime, we would like to keep the discussion and
>>>> >> process
>>>> >> > >>>>>> active on
>>>> >> > >>>>>> > this FLIP. Gaining valuable community feedback (which is
>>>> >> helping
>>>> >> > >>>>>> us) will
>>>> >> > >>>>>> > help us address any potential gaps in the source connector
>>>> >> design
>>>> >> > >>>>>> and
>>>> >> > >>>>>> > finalize it. Behind the scenes, we are already designing
>>>> and
>>>> >> > >>>>>> pre-planning
>>>> >> > >>>>>> > our development work to adhere to feedback/best practices/
>>>> >> faster
>>>> >> > >>>>>> delivery
>>>> >> > >>>>>> > when we implement this FLIP.
>>>> >> > >>>>>> > Please share your thoughts on this.
>>>> >> > >>>>>> >
>>>> >> > >>>>>> > Regards
>>>> >> > >>>>>> > Saurabh
>>>> >> > >>>>>> >
>>>> >> > >>>>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <
>>>> >> liwang505...@gmail.com>
>>>> >> > >>>>>> wrote:
>>>> >> > >>>>>> >
>>>> >> > >>>>>> > > Hi Saurabh,
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > > Thanks for the FLIP. Given the ongoing effort to
>>>> implement
>>>> >> the
>>>> >> > >>>>>> SQS sink
>>>> >> > >>>>>> > > connector (
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector
>>>> >> > >>>>>> > > ),
>>>> >> > >>>>>> > > it is important to consider how the SQS source connector
>>>> >> > supports
>>>> >> > >>>>>> this
>>>> >> > >>>>>> > > development, ensuring a unified framework for AWS
>>>> integration
>>>> >> > >>>>>> that avoids
>>>> >> > >>>>>> > > capacity overlap and maximizes synchronization. Since
>>>> the
>>>> >> > >>>>>> implementation
>>>> >> > >>>>>> > by
>>>> >> > >>>>>> > > Dhingra is still WIP, I would suggest taking that into
>>>> >> account
>>>> >> > >>>>>> and keep
>>>> >> > >>>>>> > > this FLIP as an extension of that FLIP which could be
>>>> taken
>>>> >> up
>>>> >> > >>>>>> once the
>>>> >> > >>>>>> > SQS
>>>> >> > >>>>>> > > Sink FLIP is fully implemented. If not, this may lead to
>>>> >> > >>>>>> doublework in
>>>> >> > >>>>>> > > multiple identity-related modules and structure of the
>>>> >> overall
>>>> >> > SQS
>>>> >> > >>>>>> > > connector codebase. What do you think?
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > > Thanks
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > > On Thursday, July 25, 2024, Saurabh Singh <
>>>> >> > >>>>>> saurabhsingh9...@gmail.com>
>>>> >> > >>>>>> > > wrote:
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> > > > Hi Samrat,
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > Thanks for the review and feedback.
>>>> >> > >>>>>> > > > We have evaluated all the three points. Please find
>>>> the
>>>> >> > answers
>>>> >> > >>>>>> below:
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > 1. AWS has announced JSON protocol support in SQS
>>>> [1]. Can
>>>> >> you
>>>> >> > >>>>>> shed
>>>> >> > >>>>>> > some
>>>> >> > >>>>>> > > > light on how different protocols will be supported?
>>>> >> > >>>>>> > > >  - We will utilize the AWS client library to connect
>>>> with
>>>> >> the
>>>> >> > >>>>>> AWS SQS
>>>> >> > >>>>>> > > > Service. Versions beyond 2.21.19 now support JSON, so
>>>> >> simply
>>>> >> > >>>>>> upgrading
>>>> >> > >>>>>> > > the
>>>> >> > >>>>>> > > > client library will suffice for the protocol switch.
>>>> >> However,
>>>> >> > >>>>>> from the
>>>> >> > >>>>>> > > > connector's perspective, we do not anticipate any
>>>> changes
>>>> >> in
>>>> >> > our
>>>> >> > >>>>>> > > > communication process, as it is handled by the client
>>>> >> library.
>>>> >> > >>>>>> [4]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > 2. AWS SQS has two types of queues [2]. What are the
>>>> >> > >>>>>> implementation
>>>> >> > >>>>>> > > detail
>>>> >> > >>>>>> > > > differences for the source connector?
>>>> >> > >>>>>> > > > - SQS Connector is indifferent to the customer's
>>>> choice of
>>>> >> > >>>>>> Queue type.
>>>> >> > >>>>>> > If
>>>> >> > >>>>>> > > > out-of-order messages are a concern, it will be the
>>>> >> > >>>>>> responsibility of
>>>> >> > >>>>>> > the
>>>> >> > >>>>>> > > > application code or main job logic to manage this.
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > 3. Will the SQS source connector implement any kind of
>>>> >> > >>>>>> callbacks [3] on
>>>> >> > >>>>>> > > > success to offer any kind of guarantee?
>>>> >> > >>>>>> > > > - We have proposed deleting SQS messages using the
>>>> >> > notification
>>>> >> > >>>>>> > provided
>>>> >> > >>>>>> > > by
>>>> >> > >>>>>> > > > the checkpoint framework on checkpoint completion.
>>>> Thus
>>>> >> > >>>>>> providing
>>>> >> > >>>>>> > exactly
>>>> >> > >>>>>> > > > once guarantee.[5] Additionally, when deleting
>>>> messages, we
>>>> >> > will
>>>> >> > >>>>>> > monitor
>>>> >> > >>>>>> > > > the API call responses and log any failures, along
>>>> with
>>>> >> > >>>>>> providing
>>>> >> > >>>>>> > > > observability through appropriate metrics.
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > [1]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>>> >> > >>>>>> > > > [2]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>>> >> > >>>>>> > > > [3]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>>> >> > >>>>>> > > > [4]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started
>>>> >> > >>>>>> > > > [5]
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > [Main Proposal doc] -
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > Please feel free to reach out if you have more
>>>> feedback.
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > Regards
>>>> >> > >>>>>> > > > Saurabh & Abhi
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <
>>>> >> > >>>>>> decordea...@gmail.com>
>>>> >> > >>>>>> > > wrote:
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > > > > Hi Saurabh,
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > Thank you for sharing the FLIP for the SQS source
>>>> >> connector.
>>>> >> > >>>>>> An SQS
>>>> >> > >>>>>> > > > source
>>>> >> > >>>>>> > > > > connector will be a great addition to the Flink
>>>> >> ecosystem,
>>>> >> > as
>>>> >> > >>>>>> there
>>>> >> > >>>>>> > is
>>>> >> > >>>>>> > > a
>>>> >> > >>>>>> > > > > growing demand for SQS source/sink integration.
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > I have a few queries:
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > 1. AWS has announced JSON protocol support in SQS
>>>> [1].
>>>> >> Can
>>>> >> > >>>>>> you shed
>>>> >> > >>>>>> > > some
>>>> >> > >>>>>> > > > > light on how different protocols will be supported?
>>>> >> > >>>>>> > > > > 2. AWS SQS has two types of queues [2]. What are the
>>>> >> > >>>>>> implementation
>>>> >> > >>>>>> > > > detail
>>>> >> > >>>>>> > > > > differences for the source connector?
>>>> >> > >>>>>> > > > > 3. Will the SQS source connector implement any kind
>>>> of
>>>> >> > >>>>>> callbacks [3]
>>>> >> > >>>>>> > on
>>>> >> > >>>>>> > > > > success to offer any kind of guarantee?
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > [1]
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>>> >> > >>>>>> > > > > [2]
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>>> >> > >>>>>> > > > > [3]
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > Bests,
>>>> >> > >>>>>> > > > > Samrat
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh <
>>>> >> > >>>>>> > > > saurabhsingh9...@gmail.com>
>>>> >> > >>>>>> > > > > wrote:
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > > > > Hi Fink Devs,
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > > Our team has been working on migrating various
>>>> data
>>>> >> > >>>>>> pipelines to
>>>> >> > >>>>>> > > Flink
>>>> >> > >>>>>> > > > to
>>>> >> > >>>>>> > > > > > leverage the benefits of exactly-once processing,
>>>> >> > >>>>>> checkpointing,
>>>> >> > >>>>>> > and
>>>> >> > >>>>>> > > > > > stateful computing. We have several use cases
>>>> built
>>>> >> around
>>>> >> > >>>>>> the AWS
>>>> >> > >>>>>> > > SQS
>>>> >> > >>>>>> > > > > > Service. For this migration, we have developed an
>>>> SQS
>>>> >> > Source
>>>> >> > >>>>>> > > Connector,
>>>> >> > >>>>>> > > > > > which enables us to run both stateless and
>>>> stateful
>>>> >> > >>>>>> Flink-based
>>>> >> > >>>>>> > jobs.
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > > We believe that this SQS Source Connector would
>>>> be a
>>>> >> > >>>>>> valuable
>>>> >> > >>>>>> > > addition
>>>> >> > >>>>>> > > > to
>>>> >> > >>>>>> > > > > > the existing connector set. Therefore, we propose
>>>> a
>>>> >> FLIP
>>>> >> > to
>>>> >> > >>>>>> include
>>>> >> > >>>>>> > > it.
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > > For more information, please refer to the FLIP
>>>> >> document.
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> >
>>>> >>
>>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > > > Thanks
>>>> >> > >>>>>> > > > > > Saurabh & Abhi
>>>> >> > >>>>>> > > > > >
>>>> >> > >>>>>> > > > >
>>>> >> > >>>>>> > > >
>>>> >> > >>>>>> > >
>>>> >> > >>>>>> >
>>>> >> > >>>>>>
>>>> >> > >>>>>
>>>> >> >
>>>> >>
>>>> >
>>>>
>>>

Reply via email to