Hi Flink Devs,

Thanks a lot for the valuable feedback from Arvid, Danny and David on the
design of this SQS Connector.

We had an in-depth discussion on Slack regarding the semantics and stages
of implementation. Below is the *conclusion* of our conversation.

   - Currently, FLIP will concentrate on capturing the connector details,
   design, and implementation specifically for AWS SQS Standard Queues with
   at-least once semantics. Support for FIFO queues will be introduced later
   as an extension to this FLIP.
   - *Evaluation of Exactly Once Semantics for Standard Queues*
      - Unfortunately, achieving exactly once semantics is not feasible for
      Standard Queues due to the following reasons:
         - There can be duplicate message deliveries with SQS standard
         queues.
         - A successful delete call does not guarantee the absolute
         deletion of the messages.

Therefore, *we can only provide at-least-once delivery for Standard Queues.*

   - *Evaluation of Exactly Once Semantics for FIFO Queues*
      - Exactly once semantics is achievable for FIFO Queues. To accomplish
      this, we need to keep track of seqNum (which is strictly increasing for
      messages) and groupId. By using the seqNum, we can achieve exactly-once
      semantics.


   - *Manipulation of Invisible Messages*
      - *For Standard Queues*:
         - Since we are providing at-least-once delivery, there is no need
         to manipulate invisible messages. After their visibility
timeout expires,
         the messages are consumed again, and user code needs to
handle these corner
         scenarios.
      - *For FIFO Queues:*
         - The challenge is that processing is halted if the previous
         invisible messages are not deleted. Therefore, we need to
additionally keep
         track of ReceiptHandle. In case messages remain in an
invisible state for
         an extended period, we need to mark them visible again. This
can be quite
         tricky, especially with multiple groupIds. The details and
design of this
         mechanism will be an extension to this flip.


Thanks a lot for the feedback. We have updated the FLIP.
Updated Google Doc Link -
https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing

Thanks
Saurabh & Abhi

-----------------------------------------------------------------------------------------
*Slack Conversation Details **snapshot* *<Feel Free to Skip>**:  TLDR *


Saurabh SinghSaurabh Singh  6:06 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726576584259939>
Hi @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>Thank you
for your response. We believe that it might be more efficient to continue
this conversation via Slack DMs, and we can update the FLIP with our
conclusions afterwards. I hope this works for you, but please feel free to
let me know if you have any concerns.
Regarding the discussion points:

   1. *SQS Limitation*: SQS has a limitation where you cannot perform
   operations on invisible messages if their receipt handles (the reference to
   the messages) are not available.
   2. *SQS Message IDs*: Message IDs are unique random strings.

Now, we have two scenarios:

   - *Messages become invisible but are not captured as part of the
state* (e.g.,
   while reading): In this case, we can't do anything except wait for them to
   become visible again, which is not ideal for FIFO queues.
   - *Messages become invisible but are captured as part of the state* (e.g.,
   failure to act on the NotifyCheckpointComplete call): Here, we would
   merge all the message IDs read until the next completed checkpoint and then
   delete them together.

Are you planning to store the message ids of all to-be-deleted messages in
the state? That certainly works for low-volume sources but ideally, you
would just store the highest message id and enumerate the other message ids
from it.

Yes, we need to store all message IDs individually because, in Standard SQS
queues, messages are delivered in an unordered manner.Initially, we
considered addressing both Standard and FIFO queues. However, after
discussions and further thought, it seems that the same approach may not be
ideal for FIFO queues. We’d like to seek your advice on limiting the scope
of this FLIP to focus exclusively on Standard queues for better
implementation and clarity. We believe this could help streamline the
process, and we could later have a follow-up discussion or extension to
explore adjustments and optimizations for FIFO queues.
What are your thoughts on this approach?
Arvid HeiseArvid Heise  7:07 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580266021769>
It's completely fine to focus on one type of queue and add the other later
(even without FLIP). Of course, you should either fail on FIFO or document
that this works in a very limited way.
7:09 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580384937009>
Arvid Heise
Storing all message IDs in the Flink state should be seen as a last resort.
It's okay but depending on the number of messages may blow up state size
significantly. It's much better than storing the payloads themselves (as we
need to do for deduplication operator).
Danny CranmerDanny Cranmer  7:10 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580422350019>
was added to the conversation by Arvid Heise.
Arvid HeiseArvid Heise  7:11 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580488752129>
It usually pays off to toss a few ideas back and forth though: maybe it's
enough to store the ID of the last committed message. If you have a way to
distinguish replayed and not replayed message (timestamp?), then you could
discard all replayed message until the ID pops up.
7:13 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580596696739>
Arvid Heise
What else can you do with receipt handles? Does it make sense to store them
instead of the ID?
7:14 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580691227369>
Arvid Heise
Also probably a stupid Q: what happens with already read message when I set
the timeout to 0? I'm assuming the respective timeout is attached to
message on read and thus changing the timeout later has no effect on them...
7:15 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580751903719>
Arvid Heise
Lastly: you much contact do you have to the SQS folks? Could they add a way
to interact with invisible messages later? Then, we could focus on a simple
solution (possibly ignoring EOS) and add that later.
Wednesday, 18 September
Saurabh SinghSaurabh Singh  5:43 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726661587582269>
Thank you for your feedback. Please find our responses below:

   1. Regarding message handling, we are indeed storing the receipt handles
   for the messages in the state. This is necessary for performing the delete
   operation on the messages later. In our earlier communication, we referred
   to them as message IDs for simplicity and ease of understanding.
   2. For *Standard SQS queues*, due to the lack of ordering guarantees, we
   must store each receipt handle to ensure we can delete the messages
   appropriately. The message IDs alone are not sufficient for this purpose.
   3. Anytime we set the visibility timeout of a message to 0 (whether
   immediately after reading or later), the message becomes visible again,
   leading to reprocessing and duplication. Fine-tuning the visibility timeout
   is key to avoiding this duplication. (Ref Link
   
<https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0>
   )
   4. We are simply users of the AWS infrastructure, and do not have direct
   interaction with the SQS development team. So it is not possible to
   interact with them on this.

Let us know your thoughts on this? (edited)
[image: Amazon Web Services, Inc.]Amazon Web Services, Inc.
SQS Visibility Timeout 0
<https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0>
I have an SQS queue with visibility timeout 0 to test how it works because
I intend to use that setting for something that is outside of scope of this
explanation.
I have a Lambda function trigger...
Thursday, 19 September
Danny CranmerDanny Cranmer  1:43 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726733595413099>
Disclaimer, I am not experienced with SQS.

Regarding message handling, we are indeed storing the receipt handles for
the messages in the state.

Is this code available somewhere for us to look at, might help with the
understanding.

we are indeed storing the receipt handles for the messages in the state

Are we also storing the actual messageID? We would likely need this in the
case when we need to deduplicate a message with a new read receiptI note
that standard queues only guarantee at-least-once delivery, so we cannot
really provide exactly-once in Flink without more extensive deduplication
logic
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html.
Unless the delete operation returns an error that we can retry, do you know
the behaviour in these conditions?Since we are storing the message ID
(receipt handles) in the state it sounds like the happy path is solid,
minus the above ^. For the "Messages become invisible but are not captured
as part of the state" condition I suppose we are also covered, but we will
introduce some latency increasing out-of-orderness.It sounds like we need
to track the receiptId and messageId in state until we have successfully
deleted in SQS. The receiptId is used to actually delete the message and the
 messageId is needed to dedupe. I assume we will be controlling the
visibility period in the source? If so we can expose this to the user (with
sensible defaults) to tune, ofc it should be something > checkpoint
interval.

We are simply users of the AWS infrastructure, and do not have direct
interaction with the SQS development team. So it is not possible to
interact with them on this.

I know a few people that work at SQS, no promises but I have pinged them
and might be able to get some feedback from them (edited)
Arvid HeiseArvid Heise  2:37 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736852216579>
I wonder if we get away with simply offering at least once and then expose
the message ID as metadata and use Flink to dedupe.
2:37 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736863805459>
Arvid Heise
If I don't delete a message, are they stored indefintively?
2:40 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726737056514629>
Arvid Heise
Just to explain my line of thought: Storing per message data in the source
blows up state. We can do that for low volume sources or if we have complex
queries (e.g. joins which also stores per message data). If SQS is low
volume anyways, we might simply replay everything and dedupe via message
ID. That obviously will slow down recovery, so it's more likely to work if
messages have some inherent retention period.
Saurabh SinghSaurabh Singh  5:55 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726748737945309>
Thank you for taking time and your feedback:

Is this code available somewhere for us to look at? It might help with
understanding.

Currently, it is not available in any public domain. However, you can refer
to the documentation of messageId and ReceiptHandle here
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
.

Are we also storing the actual messageID? We would likely need this in the
case when we need to deduplicate a message with a new read receipt.

At the moment, we are not storing the actual messageID. We only perform
delete operations, which require only the receipt handle. While storing the
messageID would indeed be useful for deduplication, it introduces the
additional overhead of retaining the messageID until its retention period
expires. Please note that as per the reference
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html>
 even a successful delete operation is not 100% guaranteed, as the copy of
the message might not be deleted on the server that is unavailable, and you
might receive that message copy again.

I assume we will be controlling the visibility period in the source? If so,
we can expose this to the user (with sensible defaults) to tune, of course,
it should be something > checkpoint interval.

While we can manage this via APIs, it should be part of the infrastructure
(SQS) provisioning and not something to be tuned on demand. We will provide
guidelines to tune these parameters (Visibility Timeout/Retention Period)
with respect to checkpoint duration/interval/timeout.

I wonder if we get away with simply offering at least once.

Yes, based on the details and discussion, this seems like a good starting
point.

Expose the message ID as metadata and use Flink to dedupe.

Could you please expand on this? As we understand, we would need to
maintain the messageID in a data structure until the retention time
configured on the queue.

If I don't delete a message, are they stored indefinitely?

No, messages are not stored indefinitely. They expire or are deleted
automatically after the message retention period of the queue.
[image: docs.aws.amazon.com]docs.aws.amazon.com
Amazon SQS queue and message identifiers - Amazon Simple Queue Service
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
Learn about the unique identifiers assigned to messages in standard and
FIFO queues, their formats, and how to effectively manage and track
messages using these identifiers.
[image: docs.aws.amazon.com]docs.aws.amazon.com
Amazon SQS at-least-once delivery - Amazon Simple Queue Service
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html>
Learn about the mechanisms and considerations involved in ensuring message
delivery at least once.
Danny CranmerDanny Cranmer  6:08 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749531070739>

Is this code available somewhere for us to look at? It might help with
understanding.

Currently, it is not available in any public domain. However, you can refer
to the documentation of messageId and ReceiptHandle here
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html>
.

I meant your connector code, assuming it exists

Expose the message ID as metadata and use Flink to dedupe.

Could you please expand on this? As we understand, we would need to
maintain the messageID in a data structure until the retention time
configured on the queue.

We support at-least-once by default, but then provide a special
deserialisation schema interface that gives access to the SQS message ID.
Then a user can include this in their records for downstream
deduplication.  Example here
<https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java#L64>
 for Kinesis, we give access to seqNum etc (edited)
6:10 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749655044349>
Danny Cranmer

I assume we will be controlling the visibility period in the source? If so,
we can expose this to the user (with sensible defaults) to tune, of course,
it should be something > checkpoint interval.

While we can manage this via APIs, it should be part of the infrastructure
(SQS) provisioning and not something to be tuned on demand. We will provide
guidelines to tune these parameters (Visibility Timeout/Retention Period)
with respect to checkpoint duration/interval/timeout.

ok, it is a shame that users will need to tune their queue to work with
Flink, but sounds sensible given the limitations (edited)
Saurabh SinghSaurabh Singh  6:43 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726751603344809>
Thanks for the quick response and suggestions.

I meant your connector code, assuming it exists.

At the moment, we have a naive version implemented and used for our
internal application. Unfortunately, sharing the code in its current form
could have legal implications. Therefore, we have initiated this FLIP to
develop this for the open-source community.
[image: :+1:]1
Friday, 20 September
Danny CranmerDanny Cranmer  12:05 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770912823589>
@David Green <https://apache-flink.slack.com/team/U07NKKNBG5P> has agreed
to help us out. He is a Principal Engineer on the SQS team
David GreenDavid Green  12:05 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770941081139>
was added to the conversation by Danny Cranmer.
Danny CranmerDanny Cranmer  12:11 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726771289630659>
Hey @David Green <https://apache-flink.slack.com/team/U07NKKNBG5P>, thanks
for jumping in [image: :wave:]We are trying to design an SQS source for
Flink and achieve exactly once semantics. Flink has a built-in
checkpointing mechanism, sources typically commit their current read
position in the checkpoint and resume from there upon failure.For the SQS
source we are trying to avoid storing all messages IDs and read receipts in
the state to avoid huge state sizes. It would be helpful if you can review
this thread and see if we have made any wrong assumptions, or would suggest
any alternative solutions.
David GreenDavid Green  12:46 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773418874009>
Hi all, will read the thread to catch up. Is there a design doc or is it
all in the chat?
Danny CranmerDanny Cranmer  12:49 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773595097219>
Here is the design, although it is probably lacking details of what we are
discussing
https://cwiki.apache.org/confluence/display/FLINK/FLIP-477+Amazon+SQS+Source+Connector
[image: :+1:]1
David GreenDavid Green  1:07 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774654899819>
Is there a definition of "exactly once"?
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/
e.g. does exactly once relate to distributed state, or local state?
[image: nightlies.apache.org]nightlies.apache.org
Fault Tolerance Guarantees
<https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/>
Fault Tolerance Guarantees of Data Sources and Sinks # Flink’s fault
tolerance mechanism recovers programs in the presence of failures and
continues to execute them. Such failures include machine hardware failures,
network failures, transient program failures, etc.
Flink can guarantee exactly-once state updates to user-defined state only
when the source participates in the snapshotting mechanism. The following
table lists the state update guarantees of Flink coupled with the bundled
connectors.
David GreenDavid Green  1:13 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774992609999>
SQS standard queues provide at-least-once delivery. In rare cases, messages
can be delivered more than once.
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html
This potential for duplicate deliveries with SQS standard queues will make
exactly once semantics impractical to implement in a FLINK source
connector. Unlike SQS standard queues, SQS FIFO ordering means that you
won't receive a duplicate delivery out of order.It might be useful to
declare the capabilities of the SQS source connector separately for SQS
FIFO and SQS standard queues.
[image: docs.aws.amazon.com]docs.aws.amazon.com
Amazon SQS standard queues - Amazon Simple Queue Service
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html>
Learn about the basics of standard queues, including their features, usage,
and best practices for integrating them into your applications as part of a
messaging system.
1:18 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775287577599>
David Green
For SQS FIFO queues, you could consider using the `SequenceNumber` provided
by [ReceiveMessage](
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html
)
Tracking the `SequenceNumber` for each message group that is inflight might
be a good way to provide exactly once semantics in the FLINK source
connector.
[image: docs.aws.amazon.com]docs.aws.amazon.com
ReceiveMessage - Amazon Simple Queue Service
<https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html>
Retrieves one or more messages (up to 10), from the specified queue. Using
the WaitTimeSeconds parameter enables long-poll support. For more
information, see Amazon SQS Long Polling in the Amazon SQS Developer Guide .
Danny CranmerDanny Cranmer  1:21 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775472945699>
Exactly once means that each record is consumed and processed within the
Job only once. Even when there are failures and restarts. Flink has
"in-flight" state and checkpointed state. Upon failure Flink restores from
checkpointed state and discards "in-flight" state. Checkpoints are
consistent across parallel tasks of the Flink job
Danny CranmerDanny Cranmer  1:23 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775590547299>
What is the sequence number? These docs are not that useful "Returns the
value provided by Amazon SQS."
1 reply
5 days agoView thread
David GreenDavid Green  1:23 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775611304669>
Forgive my unfamiliarity with FLINK. What does that mean from a source
connector perspective? If a FLINK application crashes and is restarted on
another host, what is the expected behaviour?
Danny CranmerDanny Cranmer  1:28 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775900380639>
The Flink job would resume from the last checkpoint. The source should then
resume from where it was at that checkpoint time/offset/etc. The idea for a
queue is that upon checkpoint completion we delete all the messages
consumed since the last checkpoint. However this cleanup is not guaranteed
to happen, so we often store metadata in the state to help us cleanup
eventually consistently and dedupe (edited)
1:32 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776120746739>
Danny Cranmer
So one idea is to collect the receipt IDs and then delete the messages in
the checkpoint completion event. But since this might not happen we want to
store state to eventually cleanup without storing every message ID. Usually
we use some offset/sequence number.
1:33 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776221119169>
Danny Cranmer
Additionally on failure, we might need to reread messages that were
in-flight, and we do not have the receipt IDs. But they may not be visible
now, so besides tuning the visibility period is there a better way to
optimise this?
Danny CranmerDanny Cranmer  1:38 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776534516009>
Sorry for the vague questions, it's difficult when I do not know SQS and
you do not know Flink. Hopefully @Saurabh Singh
<https://apache-flink.slack.com/team/U06GJSZTEUC> will have some more
specific questions when online
[image: :+1:]1
David GreenDavid Green  1:59 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777789811769>

 Additionally on failure, we might need to reread messages that were
in-flight, and we do not have the receipt IDs. But they may not be visible
now, so besides tuning the visibility period is there a better way to
optimise this?

Unfortunately no - if a message was received and is still within its
invisibility timeout, there is no way to make it visible again without
using the ReceiptHandle.
2:01 <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777889954539>
David Green

 So one idea is to collect the receipt IDs and then delete the messages in
the checkpoint completion event. But since this might not happen we want to
store state to eventually cleanup without storing every message ID. Usually
we use some offset/sequence number.

This approach could work for SQS FIFO queues if you stored the
SequenceNumber by message group ID, but each message group with in-flight
messages would have to wait for those messages to become visible again.
Yesterday
Saurabh SinghSaurabh Singh  6:04 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727094880363379>
Thanks @Danny Cranmer <https://apache-flink.slack.com/team/U03JD1ZFQ3D> @David
Green <https://apache-flink.slack.com/team/U07NKKNBG5P> for the valuable
insights. We have reviewed the conversation and are providing a summary
here. We will update the FLIP accordingly once we agree on the same.

   - *Exactly Once Semantics for Standard Queues*

   - Unfortunately, achieving exactly once semantics is not feasible for
Standard Queues due to the following reasons:
       - There can be duplicate message deliveries with SQS standard queues.
       - A successful delete call does not guarantee the absolute deletion
of the messages.
       *Therefore, we can only provide at-least-once delivery for Standard
Queues.*• *Exactly Once Semantics for FIFO Queues*
   - Exactly once semantics is achievable for FIFO Queues. To accomplish
this, we need to keep track of seqNum (which is strictly increasing for
messages) and groupId. By using the seqNum, we can achieve exactly once
semantics.• *Manipulation of Invisible Messages*
   - *For Standard Queues:*
       - Since we are providing at-least-once delivery, there is no need to
manipulate invisible messages. After their visibility timeout expires, the
messages are consumed again, and user code needs to handle these corner
scenarios.
   -* For FIFO Queues:*
       - The challenge is that processing is halted if the previous
invisible messages are not deleted. Therefore, we need to additionally keep
track of ReceiptHandle. In case messages remain in an invisible state for
an extended period, we need to mark them visible again. This can be quite
tricky, especially with multiple groupIds. I believe we need to study this
in detail and design the mechanism in the extension flip.Let us know if
this is good and we can proceed?
------------------------------
New
David GreenDavid Green  10:06 PM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727109383466049>
LGTM
Today
Arvid HeiseArvid Heise  11:19 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727156995566719>
Just to clarify:

   - EOS with FIFO is out of scope of the FLIP now and there will be a
   follow-up FLIP?

11:22
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157161772559>
Arvid Heise
Imho (and this view is not fully shared by all Flink committers) you don't
strictly need FLIPs for connectors, especially if you just submit
extensions. However, you can see how beneficial it is to discuss the ideas
with a broader audience.
Having a FLIP for the start of a connector is definitively something that
I'd like to see but we would be driven mad if there is FLIP for each and
every improvement. It's a different story if you change existing behavior.
Saurabh SinghSaurabh Singh  11:33 AM
<https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157813737819>
Agree @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>. We
will just extend the existing FLIP to document the behaviour FIFO queues.
This discussion has been very valuable for us to extend our knowledge and
understanding of the concepts and concluding to the right behaviour for the
connector. Thanks for helping us out here.
We will update the FLIP and discussion thread with the new details.




-------------------------------------------------------------------------------------------




On Tue, Sep 17, 2024 at 12:26 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Saurabh,
>
> thank you very much for taking the time to explain SQS features to me.
> It's a cool concept that I haven't seen before.
>
> While your presented approach already works for Flink EOS, I still need to
> poke a bit because I think the usability may be improved.
>
> Some background information on how sinks usually work:
> * On checkpoint, we store the committables (=transactions or file
> locations).
> * On notifyCheckpointComplete, we commit them all or fail on persisting
> errors.
> * On recovery, we retry all stored committables.
> * On recovery, we cancel all opened transactions not belonging to the
> recovered state.
>
> Now back to SQS, can we speed up recovery? Let's go over my thoughts:
> * I'm assuming you need to set the timeout to a value that corresponds to
> Flink checkpoint duration. So I often see 1 minute checkpointing interval
> and a checkpoint taking some seconds, so we would choose a SQS timeout of 2
> minutes (we'd probably set it higher in realistic settings).
> * Before failure, Flink tries to read message1 and fails, there are more
> messages afterwards (message2, ...)
> * In standard queues, Flink restarts and will not see message1 until the
> timeout happens. It can immediately start processing message2 and so on.
> After 2 minutes, it will finally see message1 and consume it.
> * In FIFO queues, Flink will not see any message until the timeout
> happens. So effectively, Flink will not process anything for 2 minutes.
> That means that recovery is effectively always as high as the SQS timeout.
>
> I'm curious if there is a way to manually timeout the messages for faster
> recovery? I saw that you can decrease the timeout of specific messages,
> which could be used to set the timeout to 0 on restart of message1. If that
> works, we could do similar things to the sink. On restart, we immediately
> try to delete the messages that are part of the checkpoint and set the
> timeout to 0 for messages that were read after the checkpoint. If it's
> somehow possible to enumerate all messages before and past the respective
> checkpoint, we would not depend on the SQS timeout of the consumer for
> correctness and recovery time and could set it to its max=12h. Else we
> would have the users to trade off correctness (timeout too low) vs recovery
> time (timeout too high).
>
> What are you planning to store as the source state?
> > *record2 is still captured as part of the state, so it gets deleted
> from the queue during the next checkpoint.*
> Are you planning to store the message ids of all to-be-deleted messages in
> the state? That certainly works for low volume sources but ideally, you
> would just store the highest message id and enumerate the other message ids
> from it.
>
> Could you please update the FLIP to include the provided information so
> that other readers can quickly understand the visibility timeout and the
> state management? (external links are fine of course but please provide a
> brief summary in case the link becomes dead at some point) You can of
> course defer it until we are completely aligned.
>
> Best,
>
> Arvid
>
> On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh <saurabhsingh9...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks a lot for your review.
>>
>>    1. Exactly once:
>>
>>
>>
>> *When using an SQS source, we leverage two key properties of SQS:1.
>> Message Retention: This property ensures that messages are retained in the
>> queue for a specified period, allowing consumers enough time to process
>> them. Post these messages are deleted.2. Visibility Timeout [1] : This
>> property prevents other consumers from receiving and processing the same
>> message while it's being processed by the current consumer.*
>>
>> *Regarding the scenario described below:*
>>
>> * Read record2, emit record2, record2 is written into a Kafka transaction2
>> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
>> to be committed
>> * NotifyCheckpointCompleted2 is lost for some reason or happens after next
>> failure
>> * Some failure happens, Flink is restarted with the current system state:
>> SQS contains record2, record3, Kafka contains record1 and pending record2.
>> On restart, sink will commit transactions2 recovered from the state. No
>> SQS
>> contains record2, record3 and Kafka contains record1, record2.
>> * Read record2, emit record2, record2 is written into a Kafka transaction3
>> * Eventually, we end up with two record2 in the sink.
>>
>> *In this scenario, we set the Message Visibility Timeout to be multiples
>> of the checkpoint duration and ensure it is longer than the recovery
>> duration. This way, when recovery occurs, record2 remains invisible and is
>> not read again, preventing duplication. record2 is still captured as part
>> of the state, so it gets deleted from the queue during the next checkpoint.*
>>
>> *Message Retention Time: We typically set this to a sufficiently long
>> duration. This is beneficial for recovering from extended outages and
>> backfilling data. If a message is not deleted, it will be read again once
>> it becomes visible after the visibility timeout expires.*
>>
>> *This is our current understanding for achieving exactly-once processing.
>> Do you see any challenges with this approach? If so, we would greatly
>> appreciate your feedback.*
>>
>> 2. Deleting messages:
>> *Yes, It's quite natural for consumers to delete messages for the reasons
>> mentioned above, such as recovery from outages and backfilling data.*
>>
>> 3. Regarding the Source parallelism:
>> *ACK. We'll document this behavior accordingly.*
>>
>> 4. Multiple queue support:
>> *Yes, as an extension to this FLIP, we will support this functionality in
>> the future. However, in the meantime, this can still be achieved by
>> creating multiple sources with specific queues.*
>>
>>
>> Please review and let us know your feedback on this.
>>
>> [1]
>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
>>
>>
>> Thanks
>> Saurabh & Abhi
>>
>>
>> On Thu, Sep 12, 2024 at 2:23 PM Arvid Heise <ar...@apache.org> wrote:
>>
>>> <I accidentally sent this message Saurabh directly instead to the mailing
>>> list, please disregard that first message and only respond to this one>
>>>
>>> Hi Saurabh,
>>>
>>> thank you for addressing the points. I still have concerns around EOS
>>> and a
>>> few remarks.
>>>
>>> 1. Exactly once:
>>> The checkpoint subsuming contract typically only applies to sinks, so
>>> it's
>>> unusual to be used in sources. Let's quickly look at an example so we are
>>> all on the same page.
>>>
>>> Assume we read messages record1, record2, record3 all arriving a few
>>> minutes apart such there are checkpoints in-between. For simplicity let's
>>> assume one checkpoint between records and call them checkpoint1 after 1,
>>> checkpoint2 after 2 and so on. The data is simply stored in some EOS sink
>>> (e.g. Kafka). The corresponding calls of NotifyCheckpointCompleted are
>>> enumerated with NotifyCheckpointCompleted1, NotifyCheckpointCompleted2,
>>> ...
>>>
>>> Then your source sees the following on failure:
>>> * Read record1, emit record1, record1 is written into a Kafka
>>> transaction1
>>> * Checkpoint1 happens, state is checkpointed, sink remembers transaction1
>>> to be committed
>>> * NotifyCheckpointCompleted1 happens, sink commits transaction1, so
>>> record1
>>> is visible to downstream consumer
>>> * During NotifyCheckpointCompleted1, SQS source also deletes record1
>>> * Some failure happens, Flink is restarted with the current system state:
>>> SQS contains record2, record3, Kafka contains record1. Since transaction1
>>> is already committed, the sink ignores it. So far so good.
>>>
>>> * Read record2, emit record2, record2 is written into a Kafka
>>> transaction2
>>> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
>>> to be committed
>>> * NotifyCheckpointCompleted2 is lost for some reason or happens after
>>> next
>>> failure
>>> * Some failure happens, Flink is restarted with the current system state:
>>> SQS contains record2, record3, Kafka contains record1 and pending
>>> record2.
>>> On restart, sink will commit transactions2 recovered from the state. No
>>> SQS
>>> contains record2, record3 and Kafka contains record1, record2.
>>> * Read record2, emit record2, record2 is written into a Kafka
>>> transaction3
>>> * Eventually, we end up with two record2 in the sink.
>>>
>>> So you see that with your proposal, we get duplicate records easily on
>>> failure, which is not exactly once. It's not enough to delete the
>>> messages
>>> eventually with NotifyCheckpointCompleted3.
>>>
>>> How it usually works:
>>> * The checkpoint state of the source contains some kind of offset, which
>>> would point to record3 in our example.
>>> * On recovery, seek the record corresponding to the offset and start
>>> reading from there.
>>>
>>> If the source system doesn't support seeking, then the usual way is to
>>> replay a larger section but discard all records with a smaller offset. If
>>> there is no strict ordering, a last resort is to track all message ids on
>>> low volume sources and discard duplicates. You could also start with just
>>> offering AT LEAST ONCE and let the user deduplicate. We should just be
>>> wary
>>> to not call something exactly once that isn't.
>>>
>>> 2. Deleting messages:
>>> It's rather uncommon for a connector to explicitly delete messages.
>>> Usually, they fall off retention at some point. I'd try to decouple that
>>> from EOS and give users an option to fully consume or not. Unless of
>>> course, it's very natural for SQS consumers to delete the messages; I
>>> can't
>>> assess that as I have never used SQS.
>>>
>>> 3. Regarding the parallelism:
>>> > *We have reviewed the DynamicParallelismInference and noticed that it
>>> is
>>> currently limited to batching jobs only [2]. *
>>> You are correct it cannot be used and thus we cannot enforce parallelism
>>> of
>>> 1. I got excited when I saw that interface and didn't read properly.
>>>
>>> Limiting the parallelism will help to reduce the resources. If you spawn
>>> more than 1 source subtask, it should get immediately terminated because
>>> there is nothing to do (don't forget to emit SourceReaderFinishedEvent
>>> for
>>> all idle subtasks). If the pipeline is embarrassing parallel, all
>>> downstream subtasks will also be closed.
>>> But there is nothing that you can do except document it to set the
>>> parallelism of the source to 1. A higher parallelism is only usable with
>>> a
>>> shuffle then.
>>>
>>> Alternatively, you could think about allowing the source to subscribe to
>>> multiple queues and distribute them to the different subtasks.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Tue, Sep 10, 2024 at 4:50 PM Saurabh Singh <
>>> saurabhsingh9...@gmail.com>
>>> wrote:
>>>
>>> > Hi Danny, Arvid,
>>> >
>>> > Thank you so much for reviewing the FLIP. We apologize for the delayed
>>> > response; we've been quite busy over the past week.
>>> >
>>> >
>>> >    - // .setFailOnError(false)
>>> >    A general callout that this is not ideal, I know we use it elsewhere
>>> >    but we should consider some pluggable error handling at some point.
>>> Not as
>>> >    part of this FLIP, but if we can avoid adding this flag here, then
>>> I would.
>>> >
>>> > *Acknowledged. We will remove this flag and address it separately to
>>> > enhance our error handling design.*
>>> >
>>> >    - // The code registers a checkpoint completion notification
>>> callback
>>> >    via *notifyCheckpointComplete*
>>> >    Flink does not guarantee that notifications are actually invoked and
>>> >    successful [1]. Therefore there is a chance this method will not
>>> run, and
>>> >    hence we could violate exactly once semantics here. Suggest that we
>>> >    instead/additionally track the SQS read messages within the
>>> checkpoint
>>> >    state, we can evict from state once deleted, and possibly prune on
>>> startup
>>> >    or skip duplicates. Thoughts?
>>> >
>>> > *Yes, our implementation will comply with the checkpoint subsuming
>>> > contract mentioned in [1]. To achieve this, we will track newly read
>>> > messages <message ids> in a structure associated with a checkpoint ID.
>>> In
>>> > cases where checkpoint notifications are missed, we will review all
>>> > messages in the state with checkpoint IDs less than the received one,
>>> > delete those messages, and remove them from the state.*
>>> >
>>> > *In addition, for optimal functionality, we depend on the visibility
>>> > timeout configured for the queue. If read messages are not deleted
>>> before
>>> > the visibility timeout expires due to any transient issues, they will
>>> > reappear. We will offer guidance to users of this connector on
>>> adjusting
>>> > the visibility timeout value to align with specific checkpoint
>>> durations
>>> > and avoid duplicate processing.*
>>> >
>>> >    - Since we only have 1 split, we should also limit the parallelism
>>> of
>>> >    the source accordingly. We could exploit the
>>> DynamicParallelismInference to
>>> >    effectively limit it to 1 unless the user explicitly overwrites.
>>> >
>>> > *We have reviewed the DynamicParallelismInference and noticed that it
>>> is
>>> > currently limited to batching jobs only [2]. We would appreciate it if
>>> you
>>> > could help us understand the rationale behind restricting the
>>> parallelism
>>> > for the source to 1. Are there any benefits to this limitation, or
>>> > potential issues if we do not enforce it?*
>>> >
>>> >    - If I haven't overlooked something obvious, then your exactly-once
>>> >    strategy will not work. There is unfortunately no guarantee that
>>> >    notifyCheckpointComplete is called at all before a failure happens.
>>> So
>>> >    you very likely get duplicate messages. Scanning the SQS
>>> >    documentation, I saw that you can read the SequenceNumber of the
>>> message.
>>> >    If you also store the latest number in the checkpoint state of the
>>> split,
>>> >    then you can discard all messages during recovery that are smaller.
>>> But I
>>> >    have never used SQS, so just take it as an inspiration. Also note
>>> that you
>>> >    didn't sketch how you delete messages from the queue. It's very
>>> important
>>> >    to only delete those messages that are part of the successful
>>> checkpoint.
>>> >    So you can't use PurgeQueue.
>>> >
>>> > *As mentioned earlier in the second point, we have already discussed
>>> the
>>> > mechanism. Please let us know if you have any concerns.*
>>> >
>>> >    - I wonder if you need to use ReceiveRequestAttemptId. It looks like
>>> >    it may be important for retries.
>>> >
>>> >
>>> >
>>> > *Since we use the AWS SDK library to establish connections and perform
>>> SQS
>>> > operations. This library already ensures retries for any transient
>>> issues,
>>> > with a default of three retries [3]. Additionally, AWS provides
>>> predefined
>>> > schemes to select from when creating the client.*
>>> > Please review and let us know your feedback on this.
>>> >
>>> >
>>> > *[1] *
>>> >
>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html
>>> > *[2] *
>>> >
>>> https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java#L29
>>> > *[3]*
>>> >
>>> https://github.com/aws/aws-sdk-java/blob/61d73631fac8535ad70666bbce9e70a1d2cea2ca/aws-java-sdk-core/src/main/java/com/amazonaws/retry/PredefinedRetryPolicies.java#L41
>>> >
>>> >
>>> >
>>> > Regards
>>> > Saurabh & Abhi
>>> >
>>> >
>>> >
>>> > On Tue, Sep 3, 2024 at 6:40 PM Arvid Heise <ar...@apache.org> wrote:
>>> >
>>> >> Sorry for being late to the party. I saw your call to vote and looked
>>> at
>>> >> the FLIP.
>>> >>
>>> >> First, most of the design is looking really good and it will be good
>>> to
>>> >> have another connector integrated into the AWS ecosystem. A couple of
>>> >> questions/remarks:
>>> >> 1) Since we only have 1 split, we should also limit the parallelism
>>> of the
>>> >> source accordingly. We could exploit the DynamicParallelismInference
>>> to
>>> >> effectively limit it to 1 unless the user explicitly overwrites.
>>> >> 2) If I haven't overlooked something obvious, then your exactly-once
>>> >> strategy will not work. There is unfortunately no guarantee
>>> >> that notifyCheckpointComplete is called at all before a failure
>>> happens.
>>> >> So
>>> >> you very likely get duplicate messages.
>>> >> Scanning the SQS documentation, I saw that you can read the
>>> SequenceNumber
>>> >> of the message. If you also store the latest number in the checkpoint
>>> >> state
>>> >> of the split, then you can discard all messages during recovery that
>>> are
>>> >> smaller. But I have never used SQS, so just take it as an inspiration.
>>> >> Also note that you didn't sketch how you delete messages from the
>>> queue.
>>> >> It's very important to only delete those messages that are part of the
>>> >> successful checkpoint. So you can't use PurgeQueue.
>>> >> 3) I wonder if you need to use ReceiveRequestAttemptId. It looks like
>>> it
>>> >> may be important for retries.
>>> >>
>>> >> Best,
>>> >>
>>> >> Arvid
>>> >>
>>> >> On Tue, Aug 20, 2024 at 11:24 AM Ahmed Hamdy <hamdy10...@gmail.com>
>>> >> wrote:
>>> >>
>>> >> > Hi Abhisagar and Saurabh
>>> >> > I have created the FLIP page and assigned it FLIP-477[1]. Feel free
>>> to
>>> >> > resume with the next steps.
>>> >> >
>>> >> > 1-
>>> >> >
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-+477+Amazon+SQS+Source+Connector
>>> >> > Best Regards
>>> >> > Ahmed Hamdy
>>> >> >
>>> >> >
>>> >> > On Tue, 20 Aug 2024 at 06:05, Abhisagar Khatri <
>>> >> > khatri.abhisaga...@gmail.com>
>>> >> > wrote:
>>> >> >
>>> >> > > Hi Flink Devs,
>>> >> > >
>>> >> > > Gentle Reminder for the request. We'd like to ask the
>>> PMC/Committers
>>> >> to
>>> >> > > transfer the content from the Amazon SQS Source Connector Google
>>> Doc
>>> >> [1]
>>> >> > > and assign a FLIP Number for us, which we can use further for
>>> voting.
>>> >> > > We are following the procedure outlined on the Flink Improvement
>>> >> Proposal
>>> >> > > Confluence page [2].
>>> >> > >
>>> >> > > [1]
>>> >> > >
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>> >> > > [2]
>>> >> > >
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>>> >> > >
>>> >> > > Regards,
>>> >> > > Abhi & Saurabh
>>> >> > >
>>> >> > >
>>> >> > > On Tue, Aug 13, 2024 at 12:50 PM Saurabh Singh <
>>> >> > saurabhsingh9...@gmail.com>
>>> >> > > wrote:
>>> >> > >
>>> >> > >> Hi Flink Devs,
>>> >> > >>
>>> >> > >> Thanks for all the feedback flink devs.
>>> >> > >>
>>> >> > >> Following the procedure outlined on the Flink Improvement
>>> Proposal
>>> >> > >> Confluence page [1], we kindly ask the PMC/Committers to
>>> transfer the
>>> >> > >> content from the Amazon SQS Source Connector Google Doc [2] and
>>> >> assign a
>>> >> > >> FLIP Number for us, which we will use for voting.
>>> >> > >>
>>> >> > >> [1]
>>> >> > >>
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>>> >> > >> [2]
>>> >> > >>
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>> >> > >>
>>> >> > >> Regards
>>> >> > >> Saurabh & Abhi
>>> >> > >>
>>> >> > >>
>>> >> > >> On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh <
>>> >> > saurabhsingh9...@gmail.com>
>>> >> > >> wrote:
>>> >> > >>
>>> >> > >>> Hi Ahmed,
>>> >> > >>>
>>> >> > >>> Yes, you're correct. Currently, we're utilizing the "record
>>> >> emitter" to
>>> >> > >>> send messages into the queue for deletion. However, for the
>>> actual
>>> >> > deletion
>>> >> > >>> process, which is dependent on the checkpoints, we've been
>>> using the
>>> >> > source
>>> >> > >>> reader class because it allows us to override the
>>> >> > notifyCheckpointComplete
>>> >> > >>> method.
>>> >> > >>>
>>> >> > >>> Regards
>>> >> > >>> Saurabh & Abhi
>>> >> > >>>
>>> >> > >>> On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <
>>> hamdy10...@gmail.com>
>>> >> > wrote:
>>> >> > >>>
>>> >> > >>>> Hi Saurabh
>>> >> > >>>> Thanks for addressing, I see the FLIP is in much better state.
>>> >> > >>>> Could we specify where we queue messages for deletion, In my
>>> >> opinion
>>> >> > >>>> the record emitter is a good place for that where we delete
>>> >> messages
>>> >> > that
>>> >> > >>>> are forwarded to the next operator.
>>> >> > >>>> Other than that I don't have further comments.
>>> >> > >>>> Thanks again for the effort.
>>> >> > >>>>
>>> >> > >>>> Best Regards
>>> >> > >>>> Ahmed Hamdy
>>> >> > >>>>
>>> >> > >>>>
>>> >> > >>>> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <
>>> >> > saurabhsingh9...@gmail.com>
>>> >> > >>>> wrote:
>>> >> > >>>>
>>> >> > >>>>> Hi Ahmed,
>>> >> > >>>>>
>>> >> > >>>>> Thank you very much for the detailed, valuable review. Please
>>> find
>>> >> > our
>>> >> > >>>>> responses below:
>>> >> > >>>>>
>>> >> > >>>>>
>>> >> > >>>>>    - In the FLIP you mention the split is going to be 1 sqs
>>> Queue,
>>> >> > >>>>>    does this mean we would support reading from multiple
>>> queues?
>>> >> > This is also
>>> >> > >>>>>    not clear in the implementation of `addSplitsBack` whether
>>> we
>>> >> are
>>> >> > planning
>>> >> > >>>>>    to support multiple sqs topics or not.
>>> >> > >>>>>
>>> >> > >>>>> *Our current implementation assumes that each source reads
>>> from a
>>> >> > >>>>> single SQS queue. If you need to read from multiple SQS
>>> queues,
>>> >> you
>>> >> > can
>>> >> > >>>>> define multiple sources accordingly. We believe this approach
>>> is
>>> >> > clearer
>>> >> > >>>>> and more organized compared to having a single source switch
>>> >> between
>>> >> > >>>>> multiple queues. This design choice is based on weighing the
>>> >> > benefits, but
>>> >> > >>>>> we can support multiple queues per source if the need arises.*
>>> >> > >>>>>
>>> >> > >>>>>    - Regarding Client creation, there has been some effort in
>>> the
>>> >> > >>>>>    common `aws-util` module like createAwsSyncClient, we
>>> should
>>> >> > reuse that for
>>> >> > >>>>>    `SqsClient` creation.
>>> >> > >>>>>
>>> >> > >>>>>    *Thank you for bringing this to our attention. Yes, we will
>>> >> > >>>>>    utilize the existing createClient methods available in the
>>> >> > libraries. Our
>>> >> > >>>>>    goal is to avoid any code duplication on our end.*
>>> >> > >>>>>
>>> >> > >>>>>
>>> >> > >>>>>    - On the same point for clients, Is there a reason the FLIP
>>> >> > >>>>>    suggests async clients? sync clients have proven more
>>> stable
>>> >> and
>>> >> > the source
>>> >> > >>>>>    threading model already guarantees no blocking by sync
>>> clients.
>>> >> > >>>>>
>>> >> > >>>>> *We were not aware of this, and we have been using async
>>> clients
>>> >> for
>>> >> > >>>>> our in-house use cases. However, since we already have sync
>>> >> clients
>>> >> > in the
>>> >> > >>>>> aws-util that ensure no blocking, we are in a good position.
>>> We
>>> >> will
>>> >> > use
>>> >> > >>>>> these sync clients during our development and testing
>>> efforts, and
>>> >> > we will
>>> >> > >>>>> share the results and keep the community updated.*
>>> >> > >>>>>
>>> >> > >>>>>    - On mentioning threading, the FLIP doesn’t mention the
>>> fetcher
>>> >> > >>>>>    manager. Is it going to be `SingleThreadFetcherManager`?
>>> Would
>>> >> it
>>> >> > be better
>>> >> > >>>>>    to make the source reader extend the
>>> >> > SingleThreadedMultiplexReaderBase or
>>> >> > >>>>>    are we going to implement a more simple version?
>>> >> > >>>>>
>>> >> > >>>>> *Yes, we are considering implementing
>>> >> > >>>>> SingleThreadMultiplexSourceReaderBase for the Reader. We have
>>> >> > included the
>>> >> > >>>>> implementation snippet in the FLIP for reference.*
>>> >> > >>>>>
>>> >> > >>>>>    - The FLIP doesn’t mention schema deserialization or the
>>> >> > >>>>>    recordEmitter implementation, Are we going to use
>>> >> > `deserializationSchema`
>>> >> > >>>>>    or some sort of string to element converter? It is also not
>>> >> clear
>>> >> > form the
>>> >> > >>>>>    builder example provided?
>>> >> > >>>>>
>>> >> > >>>>> *Yes, we plan to use the deserializationSchema and
>>> recordEmitter
>>> >> > >>>>> implementations. We have included sample code for these in the
>>> >> FLIP
>>> >> > for
>>> >> > >>>>> reference.*
>>> >> > >>>>>
>>> >> > >>>>>    - Are the values mentioned in getSqsClientProperties
>>> >> recommended
>>> >> > >>>>>    defaults? If so we should highlight that.
>>> >> > >>>>>
>>> >> > >>>>> *The defaults are not decided. These are just sample
>>> snapshots for
>>> >> > >>>>> example.*
>>> >> > >>>>>
>>> >> > >>>>>    - Most importantly I am a bit skeptical regarding enforcing
>>> >> > >>>>>    exactly-once semantics with side effects especially with
>>> >> > dependency on
>>> >> > >>>>>    checkpointing configuration, could we add flags to disable
>>> and
>>> >> > disable by
>>> >> > >>>>>    default if the checkpointing is not enabled?
>>> >> > >>>>>
>>> >> > >>>>> *During our initial design phase, we intended to enforce
>>> >> exactly-once
>>> >> > >>>>> semantics via checkpoints. However, you raise a valid point,
>>> and
>>> >> we
>>> >> > will
>>> >> > >>>>> make this a configurable feature for users. They can choose to
>>> >> > disable
>>> >> > >>>>> exactly-once semantics, accepting some duplicate processing
>>> >> > (at-least-once)
>>> >> > >>>>> as a trade-off. We have updated the FLIP to include support
>>> for
>>> >> this
>>> >> > >>>>> feature.*
>>> >> > >>>>>
>>> >> > >>>>>    - I am not 100% convinced we should block FLIP itself on
>>> the
>>> >> > >>>>>    FLIP-438 implementation but I echo the fact that there
>>> might be
>>> >> > some
>>> >> > >>>>>    reusable code between the 2 submodules we should make use
>>> of.
>>> >> > >>>>>
>>> >> > >>>>>    *Yes we echo the same. We fully understand the concern and
>>> are
>>> >> > >>>>>    closely examining the SQS Sink implementation. We will
>>> ensure
>>> >> > there is no
>>> >> > >>>>>    duplication of work or submodules. If any issues arise, we
>>> will
>>> >> > address
>>> >> > >>>>>    them promptly.*
>>> >> > >>>>>
>>> >> > >>>>>
>>> >> > >>>>> Thank you for your valuable feedback on the FLIP. Your input
>>> is
>>> >> > >>>>> helping us refine and improve it significantly.
>>> >> > >>>>>
>>> >> > >>>>> [Main Proposal doc] -
>>> >> > >>>>>
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>> >> > >>>>>
>>> >> > >>>>> Regards
>>> >> > >>>>> Saurabh & Abhi
>>> >> > >>>>>
>>> >> > >>>>>
>>> >> > >>>>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <
>>> hamdy10...@gmail.com
>>> >> >
>>> >> > >>>>> wrote:
>>> >> > >>>>>
>>> >> > >>>>>> Hi Saurabh
>>> >> > >>>>>> I think this is going to be a valuable addition which is
>>> needed.
>>> >> > >>>>>> I have a couple of comments
>>> >> > >>>>>> - In the FLIP you mention the split is going to be 1 sqs
>>> Queue,
>>> >> does
>>> >> > >>>>>> this
>>> >> > >>>>>> mean we would support reading from multiple queues? The
>>> builder
>>> >> > >>>>>> example
>>> >> > >>>>>> seems to support a single queue
>>> >> > >>>>>> SqsSource.<T>builder.setSqsUrl("
>>> >> > >>>>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";)
>>> >> > >>>>>> - This is also not clear in the implementation of
>>> `addSplitsBack`
>>> >> > >>>>>> whether
>>> >> > >>>>>> we are planning to support multiple sqs topics or not.
>>> >> > >>>>>> - Regarding Client creation, there has been some effort in
>>> the
>>> >> > common
>>> >> > >>>>>> `aws-util` module like createAwsSyncClient  , we should reuse
>>> >> that
>>> >> > for
>>> >> > >>>>>> `SqsClient` creation.
>>> >> > >>>>>> - On the same point for clients, Is there a reason the FLIP
>>> >> suggests
>>> >> > >>>>>> async
>>> >> > >>>>>> clients? sync clients have proven more stable and the source
>>> >> > threading
>>> >> > >>>>>> model already guarantees no blocking by sync clients.
>>> >> > >>>>>> - On mentioning threading, the FLIP doesn't mention the
>>> fetcher
>>> >> > >>>>>> manager. Is
>>> >> > >>>>>> it going to be `SingleThreadFetcherManager`? Would it be
>>> better
>>> >> to
>>> >> > >>>>>> make the
>>> >> > >>>>>> source reader extend the SingleThreadedMultiplexReaderBase or
>>> >> are we
>>> >> > >>>>>> going
>>> >> > >>>>>> to implement a more simple version?
>>> >> > >>>>>> - The FLIP doesn't mention schema deserialization or the
>>> >> > recordEmitter
>>> >> > >>>>>> implementation, Are we going to use `deserializationSchema`
>>> or
>>> >> some
>>> >> > >>>>>> sort of
>>> >> > >>>>>> string to element converter? It is also not clear form the
>>> >> builder
>>> >> > >>>>>> example
>>> >> > >>>>>> provided?
>>> >> > >>>>>> - Are the values mentioned in getSqsClientProperties
>>> recommended
>>> >> > >>>>>> defaults?
>>> >> > >>>>>> If so we should highlight that
>>> >> > >>>>>> - Most importantly I am a bit skeptical regarding enforcing
>>> >> > >>>>>> exactly-once
>>> >> > >>>>>> semantics with side effects especially with dependency on
>>> >> > >>>>>> checkpointing
>>> >> > >>>>>> configuration, could we add flags to disable and disable by
>>> >> default
>>> >> > >>>>>> if the
>>> >> > >>>>>> checkpointing is not enabled?
>>> >> > >>>>>>
>>> >> > >>>>>> I am not 100% convinced we should block FLIP itself on the
>>> >> FLIP-438
>>> >> > >>>>>> implementation but I echo the fact that there might be some
>>> >> reusable
>>> >> > >>>>>> code
>>> >> > >>>>>> between the 2 submodules we should make use of.
>>> >> > >>>>>>
>>> >> > >>>>>>  Best Regards
>>> >> > >>>>>> Ahmed Hamdy
>>> >> > >>>>>>
>>> >> > >>>>>>
>>> >> > >>>>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <
>>> >> > >>>>>> saurabhsingh9...@gmail.com>
>>> >> > >>>>>> wrote:
>>> >> > >>>>>>
>>> >> > >>>>>> > Hi Li Wang,
>>> >> > >>>>>> >
>>> >> > >>>>>> > Thanks for the review and appreciate your feedback.
>>> >> > >>>>>> > I completely understand your concern and agree with it. Our
>>> >> goal
>>> >> > is
>>> >> > >>>>>> to
>>> >> > >>>>>> > provide users of connectors with a consistent and coherent
>>> >> > >>>>>> ecosystem, free
>>> >> > >>>>>> > of any issues.
>>> >> > >>>>>> > To ensure that, we are closely monitoring/reviewing the SQS
>>> >> Sink
>>> >> > >>>>>> > implementation work by Dhingra. Our development work will
>>> >> commence
>>> >> > >>>>>> once the
>>> >> > >>>>>> > AWS Sink is near completion or completed. This approach
>>> ensures
>>> >> > >>>>>> that we
>>> >> > >>>>>> > take in the new learnings, do not duplicate any core
>>> modules
>>> >> and
>>> >> > >>>>>> allow us
>>> >> > >>>>>> > to save valuable time.
>>> >> > >>>>>> >
>>> >> > >>>>>> > In the meantime, we would like to keep the discussion and
>>> >> process
>>> >> > >>>>>> active on
>>> >> > >>>>>> > this FLIP. Gaining valuable community feedback (which is
>>> >> helping
>>> >> > >>>>>> us) will
>>> >> > >>>>>> > help us address any potential gaps in the source connector
>>> >> design
>>> >> > >>>>>> and
>>> >> > >>>>>> > finalize it. Behind the scenes, we are already designing
>>> and
>>> >> > >>>>>> pre-planning
>>> >> > >>>>>> > our development work to adhere to feedback/best practices/
>>> >> faster
>>> >> > >>>>>> delivery
>>> >> > >>>>>> > when we implement this FLIP.
>>> >> > >>>>>> > Please share your thoughts on this.
>>> >> > >>>>>> >
>>> >> > >>>>>> > Regards
>>> >> > >>>>>> > Saurabh
>>> >> > >>>>>> >
>>> >> > >>>>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <
>>> >> liwang505...@gmail.com>
>>> >> > >>>>>> wrote:
>>> >> > >>>>>> >
>>> >> > >>>>>> > > Hi Saurabh,
>>> >> > >>>>>> > >
>>> >> > >>>>>> > > Thanks for the FLIP. Given the ongoing effort to
>>> implement
>>> >> the
>>> >> > >>>>>> SQS sink
>>> >> > >>>>>> > > connector (
>>> >> > >>>>>> > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector
>>> >> > >>>>>> > > ),
>>> >> > >>>>>> > > it is important to consider how the SQS source connector
>>> >> > supports
>>> >> > >>>>>> this
>>> >> > >>>>>> > > development, ensuring a unified framework for AWS
>>> integration
>>> >> > >>>>>> that avoids
>>> >> > >>>>>> > > capacity overlap and maximizes synchronization. Since the
>>> >> > >>>>>> implementation
>>> >> > >>>>>> > by
>>> >> > >>>>>> > > Dhingra is still WIP, I would suggest taking that into
>>> >> account
>>> >> > >>>>>> and keep
>>> >> > >>>>>> > > this FLIP as an extension of that FLIP which could be
>>> taken
>>> >> up
>>> >> > >>>>>> once the
>>> >> > >>>>>> > SQS
>>> >> > >>>>>> > > Sink FLIP is fully implemented. If not, this may lead to
>>> >> > >>>>>> doublework in
>>> >> > >>>>>> > > multiple identity-related modules and structure of the
>>> >> overall
>>> >> > SQS
>>> >> > >>>>>> > > connector codebase. What do you think?
>>> >> > >>>>>> > >
>>> >> > >>>>>> > > Thanks
>>> >> > >>>>>> > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> > > On Thursday, July 25, 2024, Saurabh Singh <
>>> >> > >>>>>> saurabhsingh9...@gmail.com>
>>> >> > >>>>>> > > wrote:
>>> >> > >>>>>> > >
>>> >> > >>>>>> > > > Hi Samrat,
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > Thanks for the review and feedback.
>>> >> > >>>>>> > > > We have evaluated all the three points. Please find the
>>> >> > answers
>>> >> > >>>>>> below:
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > 1. AWS has announced JSON protocol support in SQS [1].
>>> Can
>>> >> you
>>> >> > >>>>>> shed
>>> >> > >>>>>> > some
>>> >> > >>>>>> > > > light on how different protocols will be supported?
>>> >> > >>>>>> > > >  - We will utilize the AWS client library to connect
>>> with
>>> >> the
>>> >> > >>>>>> AWS SQS
>>> >> > >>>>>> > > > Service. Versions beyond 2.21.19 now support JSON, so
>>> >> simply
>>> >> > >>>>>> upgrading
>>> >> > >>>>>> > > the
>>> >> > >>>>>> > > > client library will suffice for the protocol switch.
>>> >> However,
>>> >> > >>>>>> from the
>>> >> > >>>>>> > > > connector's perspective, we do not anticipate any
>>> changes
>>> >> in
>>> >> > our
>>> >> > >>>>>> > > > communication process, as it is handled by the client
>>> >> library.
>>> >> > >>>>>> [4]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > 2. AWS SQS has two types of queues [2]. What are the
>>> >> > >>>>>> implementation
>>> >> > >>>>>> > > detail
>>> >> > >>>>>> > > > differences for the source connector?
>>> >> > >>>>>> > > > - SQS Connector is indifferent to the customer's
>>> choice of
>>> >> > >>>>>> Queue type.
>>> >> > >>>>>> > If
>>> >> > >>>>>> > > > out-of-order messages are a concern, it will be the
>>> >> > >>>>>> responsibility of
>>> >> > >>>>>> > the
>>> >> > >>>>>> > > > application code or main job logic to manage this.
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > 3. Will the SQS source connector implement any kind of
>>> >> > >>>>>> callbacks [3] on
>>> >> > >>>>>> > > > success to offer any kind of guarantee?
>>> >> > >>>>>> > > > - We have proposed deleting SQS messages using the
>>> >> > notification
>>> >> > >>>>>> > provided
>>> >> > >>>>>> > > by
>>> >> > >>>>>> > > > the checkpoint framework on checkpoint completion. Thus
>>> >> > >>>>>> providing
>>> >> > >>>>>> > exactly
>>> >> > >>>>>> > > > once guarantee.[5] Additionally, when deleting
>>> messages, we
>>> >> > will
>>> >> > >>>>>> > monitor
>>> >> > >>>>>> > > > the API call responses and log any failures, along with
>>> >> > >>>>>> providing
>>> >> > >>>>>> > > > observability through appropriate metrics.
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > [1]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>> >> > >>>>>> > > > [2]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>> >> > >>>>>> > > > [3]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>> >> > >>>>>> > > > [4]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started
>>> >> > >>>>>> > > > [5]
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > [Main Proposal doc] -
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > Please feel free to reach out if you have more
>>> feedback.
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > Regards
>>> >> > >>>>>> > > > Saurabh & Abhi
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <
>>> >> > >>>>>> decordea...@gmail.com>
>>> >> > >>>>>> > > wrote:
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > > > > Hi Saurabh,
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > Thank you for sharing the FLIP for the SQS source
>>> >> connector.
>>> >> > >>>>>> An SQS
>>> >> > >>>>>> > > > source
>>> >> > >>>>>> > > > > connector will be a great addition to the Flink
>>> >> ecosystem,
>>> >> > as
>>> >> > >>>>>> there
>>> >> > >>>>>> > is
>>> >> > >>>>>> > > a
>>> >> > >>>>>> > > > > growing demand for SQS source/sink integration.
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > I have a few queries:
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > 1. AWS has announced JSON protocol support in SQS
>>> [1].
>>> >> Can
>>> >> > >>>>>> you shed
>>> >> > >>>>>> > > some
>>> >> > >>>>>> > > > > light on how different protocols will be supported?
>>> >> > >>>>>> > > > > 2. AWS SQS has two types of queues [2]. What are the
>>> >> > >>>>>> implementation
>>> >> > >>>>>> > > > detail
>>> >> > >>>>>> > > > > differences for the source connector?
>>> >> > >>>>>> > > > > 3. Will the SQS source connector implement any kind
>>> of
>>> >> > >>>>>> callbacks [3]
>>> >> > >>>>>> > on
>>> >> > >>>>>> > > > > success to offer any kind of guarantee?
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > [1]
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>>> >> > >>>>>> > > > > [2]
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>>> >> > >>>>>> > > > > [3]
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > Bests,
>>> >> > >>>>>> > > > > Samrat
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh <
>>> >> > >>>>>> > > > saurabhsingh9...@gmail.com>
>>> >> > >>>>>> > > > > wrote:
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > > > > Hi Fink Devs,
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > > Our team has been working on migrating various data
>>> >> > >>>>>> pipelines to
>>> >> > >>>>>> > > Flink
>>> >> > >>>>>> > > > to
>>> >> > >>>>>> > > > > > leverage the benefits of exactly-once processing,
>>> >> > >>>>>> checkpointing,
>>> >> > >>>>>> > and
>>> >> > >>>>>> > > > > > stateful computing. We have several use cases built
>>> >> around
>>> >> > >>>>>> the AWS
>>> >> > >>>>>> > > SQS
>>> >> > >>>>>> > > > > > Service. For this migration, we have developed an
>>> SQS
>>> >> > Source
>>> >> > >>>>>> > > Connector,
>>> >> > >>>>>> > > > > > which enables us to run both stateless and stateful
>>> >> > >>>>>> Flink-based
>>> >> > >>>>>> > jobs.
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > > We believe that this SQS Source Connector would be
>>> a
>>> >> > >>>>>> valuable
>>> >> > >>>>>> > > addition
>>> >> > >>>>>> > > > to
>>> >> > >>>>>> > > > > > the existing connector set. Therefore, we propose a
>>> >> FLIP
>>> >> > to
>>> >> > >>>>>> include
>>> >> > >>>>>> > > it.
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > > For more information, please refer to the FLIP
>>> >> document.
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> >
>>> >>
>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > > > Thanks
>>> >> > >>>>>> > > > > > Saurabh & Abhi
>>> >> > >>>>>> > > > > >
>>> >> > >>>>>> > > > >
>>> >> > >>>>>> > > >
>>> >> > >>>>>> > >
>>> >> > >>>>>> >
>>> >> > >>>>>>
>>> >> > >>>>>
>>> >> >
>>> >>
>>> >
>>>
>>

Reply via email to