I have updated the FLIP [1] (please double-check) and will add my +1 now to the voting.
Best, Arvid [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-477+Amazon+SQS+Source+Connector On Wed, Sep 25, 2024 at 2:21 PM Saurabh Singh <saurabhsingh9...@gmail.com> wrote: > Hi Flink Devs, > > Thanks a lot for the valuable feedback from Arvid, Danny and David on the > design of this SQS Connector. > > We had an in-depth discussion on Slack regarding the semantics and stages > of implementation. Below is the *conclusion* of our conversation. > > - Currently, FLIP will concentrate on capturing the connector details, > design, and implementation specifically for AWS SQS Standard Queues with > at-least once semantics. Support for FIFO queues will be introduced later > as an extension to this FLIP. > - *Evaluation of Exactly Once Semantics for Standard Queues* > - Unfortunately, achieving exactly once semantics is not feasible > for Standard Queues due to the following reasons: > - There can be duplicate message deliveries with SQS standard > queues. > - A successful delete call does not guarantee the absolute > deletion of the messages. > > Therefore, *we can only provide at-least-once delivery for Standard > Queues.* > > - *Evaluation of Exactly Once Semantics for FIFO Queues* > - Exactly once semantics is achievable for FIFO Queues. To > accomplish this, we need to keep track of seqNum (which is strictly > increasing for messages) and groupId. By using the seqNum, we can > achieve > exactly-once semantics. > > > - *Manipulation of Invisible Messages* > - *For Standard Queues*: > - Since we are providing at-least-once delivery, there is no > need to manipulate invisible messages. After their visibility timeout > expires, the messages are consumed again, and user code needs to > handle > these corner scenarios. > - *For FIFO Queues:* > - The challenge is that processing is halted if the previous > invisible messages are not deleted. Therefore, we need to > additionally keep > track of ReceiptHandle. In case messages remain in an invisible > state for > an extended period, we need to mark them visible again. This can be > quite > tricky, especially with multiple groupIds. The details and design of > this > mechanism will be an extension to this flip. > > > Thanks a lot for the feedback. We have updated the FLIP. > Updated Google Doc Link - > https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing > > Thanks > Saurabh & Abhi > > > ----------------------------------------------------------------------------------------- > *Slack Conversation Details **snapshot* *<Feel Free to Skip>**: TLDR * > > > Saurabh SinghSaurabh Singh 6:06 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726576584259939> > Hi @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>Thank > you for your response. We believe that it might be more efficient to > continue this conversation via Slack DMs, and we can update the FLIP with > our conclusions afterwards. I hope this works for you, but please feel free > to let me know if you have any concerns. > Regarding the discussion points: > > 1. *SQS Limitation*: SQS has a limitation where you cannot perform > operations on invisible messages if their receipt handles (the reference to > the messages) are not available. > 2. *SQS Message IDs*: Message IDs are unique random strings. > > Now, we have two scenarios: > > - *Messages become invisible but are not captured as part of the state* > (e.g., while reading): In this case, we can't do anything except wait > for them to become visible again, which is not ideal for FIFO queues. > - *Messages become invisible but are captured as part of the state* (e.g., > failure to act on the NotifyCheckpointComplete call): Here, we would > merge all the message IDs read until the next completed checkpoint and then > delete them together. > > Are you planning to store the message ids of all to-be-deleted messages in > the state? That certainly works for low-volume sources but ideally, you > would just store the highest message id and enumerate the other message ids > from it. > > Yes, we need to store all message IDs individually because, in Standard > SQS queues, messages are delivered in an unordered manner.Initially, we > considered addressing both Standard and FIFO queues. However, after > discussions and further thought, it seems that the same approach may not be > ideal for FIFO queues. We’d like to seek your advice on limiting the scope > of this FLIP to focus exclusively on Standard queues for better > implementation and clarity. We believe this could help streamline the > process, and we could later have a follow-up discussion or extension to > explore adjustments and optimizations for FIFO queues. > What are your thoughts on this approach? > Arvid HeiseArvid Heise 7:07 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580266021769> > It's completely fine to focus on one type of queue and add the other later > (even without FLIP). Of course, you should either fail on FIFO or document > that this works in a very limited way. > 7:09 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580384937009> > Arvid Heise > Storing all message IDs in the Flink state should be seen as a last > resort. It's okay but depending on the number of messages may blow up state > size significantly. It's much better than storing the payloads themselves > (as we need to do for deduplication operator). > Danny CranmerDanny Cranmer 7:10 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580422350019> > was added to the conversation by Arvid Heise. > Arvid HeiseArvid Heise 7:11 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580488752129> > It usually pays off to toss a few ideas back and forth though: maybe it's > enough to store the ID of the last committed message. If you have a way to > distinguish replayed and not replayed message (timestamp?), then you could > discard all replayed message until the ID pops up. > 7:13 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580596696739> > Arvid Heise > What else can you do with receipt handles? Does it make sense to store > them instead of the ID? > 7:14 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580691227369> > Arvid Heise > Also probably a stupid Q: what happens with already read message when I > set the timeout to 0? I'm assuming the respective timeout is attached to > message on read and thus changing the timeout later has no effect on them... > 7:15 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726580751903719> > Arvid Heise > Lastly: you much contact do you have to the SQS folks? Could they add a > way to interact with invisible messages later? Then, we could focus on a > simple solution (possibly ignoring EOS) and add that later. > Wednesday, 18 September > Saurabh SinghSaurabh Singh 5:43 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726661587582269> > Thank you for your feedback. Please find our responses below: > > 1. Regarding message handling, we are indeed storing the receipt > handles for the messages in the state. This is necessary for performing the > delete operation on the messages later. In our earlier communication, we > referred to them as message IDs for simplicity and ease of understanding. > 2. For *Standard SQS queues*, due to the lack of ordering guarantees, > we must store each receipt handle to ensure we can delete the messages > appropriately. The message IDs alone are not sufficient for this purpose. > 3. Anytime we set the visibility timeout of a message to 0 (whether > immediately after reading or later), the message becomes visible again, > leading to reprocessing and duplication. Fine-tuning the visibility timeout > is key to avoiding this duplication. (Ref Link > > <https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0> > ) > 4. We are simply users of the AWS infrastructure, and do not have > direct interaction with the SQS development team. So it is not possible to > interact with them on this. > > Let us know your thoughts on this? (edited) > [image: Amazon Web Services, Inc.]Amazon Web Services, Inc. > SQS Visibility Timeout 0 > <https://repost.aws/questions/QUnM3etA9fSDuMJngSvql5lw/sqs-visibility-timeout-0> > I have an SQS queue with visibility timeout 0 to test how it works because > I intend to use that setting for something that is outside of scope of this > explanation. > I have a Lambda function trigger... > Thursday, 19 September > Danny CranmerDanny Cranmer 1:43 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726733595413099> > Disclaimer, I am not experienced with SQS. > > Regarding message handling, we are indeed storing the receipt handles for > the messages in the state. > > Is this code available somewhere for us to look at, might help with the > understanding. > > we are indeed storing the receipt handles for the messages in the state > > Are we also storing the actual messageID? We would likely need this in the > case when we need to deduplicate a message with a new read receiptI note > that standard queues only guarantee at-least-once delivery, so we cannot > really provide exactly-once in Flink without more extensive deduplication > logic > https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html. > Unless the delete operation returns an error that we can retry, do you know > the behaviour in these conditions?Since we are storing the message ID > (receipt handles) in the state it sounds like the happy path is solid, > minus the above ^. For the "Messages become invisible but are not captured > as part of the state" condition I suppose we are also covered, but we will > introduce some latency increasing out-of-orderness.It sounds like we need > to track the receiptId and messageId in state until we have successfully > deleted in SQS. The receiptId is used to actually delete the message and > the messageId is needed to dedupe. I assume we will be controlling the > visibility period in the source? If so we can expose this to the user (with > sensible defaults) to tune, ofc it should be something > checkpoint > interval. > > We are simply users of the AWS infrastructure, and do not have direct > interaction with the SQS development team. So it is not possible to > interact with them on this. > > I know a few people that work at SQS, no promises but I have pinged them > and might be able to get some feedback from them (edited) > Arvid HeiseArvid Heise 2:37 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736852216579> > I wonder if we get away with simply offering at least once and then expose > the message ID as metadata and use Flink to dedupe. > 2:37 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726736863805459> > Arvid Heise > If I don't delete a message, are they stored indefintively? > 2:40 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726737056514629> > Arvid Heise > Just to explain my line of thought: Storing per message data in the source > blows up state. We can do that for low volume sources or if we have complex > queries (e.g. joins which also stores per message data). If SQS is low > volume anyways, we might simply replay everything and dedupe via message > ID. That obviously will slow down recovery, so it's more likely to work if > messages have some inherent retention period. > Saurabh SinghSaurabh Singh 5:55 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726748737945309> > Thank you for taking time and your feedback: > > Is this code available somewhere for us to look at? It might help with > understanding. > > Currently, it is not available in any public domain. However, you can > refer to the documentation of messageId and ReceiptHandle here > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html> > . > > Are we also storing the actual messageID? We would likely need this in the > case when we need to deduplicate a message with a new read receipt. > > At the moment, we are not storing the actual messageID. We only perform > delete operations, which require only the receipt handle. While storing the > messageID would indeed be useful for deduplication, it introduces the > additional overhead of retaining the messageID until its retention period > expires. Please note that as per the reference > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html> > even a successful delete operation is not 100% guaranteed, as the copy > of the message might not be deleted on the server that is unavailable, and > you might receive that message copy again. > > I assume we will be controlling the visibility period in the source? If > so, we can expose this to the user (with sensible defaults) to tune, of > course, it should be something > checkpoint interval. > > While we can manage this via APIs, it should be part of the infrastructure > (SQS) provisioning and not something to be tuned on demand. We will provide > guidelines to tune these parameters (Visibility Timeout/Retention Period) > with respect to checkpoint duration/interval/timeout. > > I wonder if we get away with simply offering at least once. > > Yes, based on the details and discussion, this seems like a good starting > point. > > Expose the message ID as metadata and use Flink to dedupe. > > Could you please expand on this? As we understand, we would need to > maintain the messageID in a data structure until the retention time > configured on the queue. > > If I don't delete a message, are they stored indefinitely? > > No, messages are not stored indefinitely. They expire or are deleted > automatically after the message retention period of the queue. > [image: docs.aws.amazon.com]docs.aws.amazon.com > Amazon SQS queue and message identifiers - Amazon Simple Queue Service > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html> > Learn about the unique identifiers assigned to messages in standard and > FIFO queues, their formats, and how to effectively manage and track > messages using these identifiers. > [image: docs.aws.amazon.com]docs.aws.amazon.com > Amazon SQS at-least-once delivery - Amazon Simple Queue Service > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues-at-least-once-delivery.html> > Learn about the mechanisms and considerations involved in ensuring message > delivery at least once. > Danny CranmerDanny Cranmer 6:08 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749531070739> > > Is this code available somewhere for us to look at? It might help with > understanding. > > Currently, it is not available in any public domain. However, you can > refer to the documentation of messageId and ReceiptHandle here > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-message-identifiers.html> > . > > I meant your connector code, assuming it exists > > Expose the message ID as metadata and use Flink to dedupe. > > Could you please expand on this? As we understand, we would need to > maintain the messageID in a data structure until the retention time > configured on the queue. > > We support at-least-once by default, but then provide a special > deserialisation schema interface that gives access to the SQS message ID. > Then a user can include this in their records for downstream > deduplication. Example here > <https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/serialization/KinesisDeserializationSchema.java#L64> > for Kinesis, we give access to seqNum etc (edited) > 6:10 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726749655044349> > Danny Cranmer > > I assume we will be controlling the visibility period in the source? If > so, we can expose this to the user (with sensible defaults) to tune, of > course, it should be something > checkpoint interval. > > While we can manage this via APIs, it should be part of the infrastructure > (SQS) provisioning and not something to be tuned on demand. We will provide > guidelines to tune these parameters (Visibility Timeout/Retention Period) > with respect to checkpoint duration/interval/timeout. > > ok, it is a shame that users will need to tune their queue to work with > Flink, but sounds sensible given the limitations (edited) > Saurabh SinghSaurabh Singh 6:43 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726751603344809> > Thanks for the quick response and suggestions. > > I meant your connector code, assuming it exists. > > At the moment, we have a naive version implemented and used for our > internal application. Unfortunately, sharing the code in its current form > could have legal implications. Therefore, we have initiated this FLIP to > develop this for the open-source community. > [image: :+1:]1 > Friday, 20 September > Danny CranmerDanny Cranmer 12:05 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770912823589> > @David Green <https://apache-flink.slack.com/team/U07NKKNBG5P> has agreed > to help us out. He is a Principal Engineer on the SQS team > David GreenDavid Green 12:05 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726770941081139> > was added to the conversation by Danny Cranmer. > Danny CranmerDanny Cranmer 12:11 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726771289630659> > Hey @David Green <https://apache-flink.slack.com/team/U07NKKNBG5P>, > thanks for jumping in [image: :wave:]We are trying to design an SQS > source for Flink and achieve exactly once semantics. Flink has a built-in > checkpointing mechanism, sources typically commit their current read > position in the checkpoint and resume from there upon failure.For the SQS > source we are trying to avoid storing all messages IDs and read receipts in > the state to avoid huge state sizes. It would be helpful if you can review > this thread and see if we have made any wrong assumptions, or would suggest > any alternative solutions. > David GreenDavid Green 12:46 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773418874009> > Hi all, will read the thread to catch up. Is there a design doc or is it > all in the chat? > Danny CranmerDanny Cranmer 12:49 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726773595097219> > Here is the design, although it is probably lacking details of what we are > discussing > https://cwiki.apache.org/confluence/display/FLINK/FLIP-477+Amazon+SQS+Source+Connector > [image: :+1:]1 > David GreenDavid Green 1:07 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774654899819> > Is there a definition of "exactly once"? > https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/ > e.g. does exactly once relate to distributed state, or local state? > [image: nightlies.apache.org]nightlies.apache.org > Fault Tolerance Guarantees > <https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/guarantees/> > Fault Tolerance Guarantees of Data Sources and Sinks # Flink’s fault > tolerance mechanism recovers programs in the presence of failures and > continues to execute them. Such failures include machine hardware failures, > network failures, transient program failures, etc. > Flink can guarantee exactly-once state updates to user-defined state only > when the source participates in the snapshotting mechanism. The following > table lists the state update guarantees of Flink coupled with the bundled > connectors. > David GreenDavid Green 1:13 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726774992609999> > SQS standard queues provide at-least-once delivery. In rare cases, > messages can be delivered more than once. > https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html > This potential for duplicate deliveries with SQS standard queues will make > exactly once semantics impractical to implement in a FLINK source > connector. Unlike SQS standard queues, SQS FIFO ordering means that you > won't receive a duplicate delivery out of order.It might be useful to > declare the capabilities of the SQS source connector separately for SQS > FIFO and SQS standard queues. > [image: docs.aws.amazon.com]docs.aws.amazon.com > Amazon SQS standard queues - Amazon Simple Queue Service > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/standard-queues.html> > Learn about the basics of standard queues, including their features, > usage, and best practices for integrating them into your applications as > part of a messaging system. > 1:18 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775287577599> > David Green > For SQS FIFO queues, you could consider using the `SequenceNumber` > provided by [ReceiveMessage]( > https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html > ) > Tracking the `SequenceNumber` for each message group that is inflight > might be a good way to provide exactly once semantics in the FLINK source > connector. > [image: docs.aws.amazon.com]docs.aws.amazon.com > ReceiveMessage - Amazon Simple Queue Service > <https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_ReceiveMessage.html> > Retrieves one or more messages (up to 10), from the specified queue. Using > the WaitTimeSeconds parameter enables long-poll support. For more > information, see Amazon SQS Long Polling in the Amazon SQS Developer Guide . > Danny CranmerDanny Cranmer 1:21 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775472945699> > Exactly once means that each record is consumed and processed within the > Job only once. Even when there are failures and restarts. Flink has > "in-flight" state and checkpointed state. Upon failure Flink restores from > checkpointed state and discards "in-flight" state. Checkpoints are > consistent across parallel tasks of the Flink job > Danny CranmerDanny Cranmer 1:23 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775590547299> > What is the sequence number? These docs are not that useful "Returns the > value provided by Amazon SQS." > 1 reply > 5 days agoView thread > David GreenDavid Green 1:23 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775611304669> > Forgive my unfamiliarity with FLINK. What does that mean from a source > connector perspective? If a FLINK application crashes and is restarted on > another host, what is the expected behaviour? > Danny CranmerDanny Cranmer 1:28 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726775900380639> > The Flink job would resume from the last checkpoint. The source should > then resume from where it was at that checkpoint time/offset/etc. The idea > for a queue is that upon checkpoint completion we delete all the messages > consumed since the last checkpoint. However this cleanup is not guaranteed > to happen, so we often store metadata in the state to help us cleanup > eventually consistently and dedupe (edited) > 1:32 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776120746739> > Danny Cranmer > So one idea is to collect the receipt IDs and then delete the messages in > the checkpoint completion event. But since this might not happen we want to > store state to eventually cleanup without storing every message ID. Usually > we use some offset/sequence number. > 1:33 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776221119169> > Danny Cranmer > Additionally on failure, we might need to reread messages that were > in-flight, and we do not have the receipt IDs. But they may not be visible > now, so besides tuning the visibility period is there a better way to > optimise this? > Danny CranmerDanny Cranmer 1:38 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726776534516009> > Sorry for the vague questions, it's difficult when I do not know SQS and > you do not know Flink. Hopefully @Saurabh Singh > <https://apache-flink.slack.com/team/U06GJSZTEUC> will have some more > specific questions when online > [image: :+1:]1 > David GreenDavid Green 1:59 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777789811769> > > Additionally on failure, we might need to reread messages that were > in-flight, and we do not have the receipt IDs. But they may not be visible > now, so besides tuning the visibility period is there a better way to > optimise this? > > Unfortunately no - if a message was received and is still within its > invisibility timeout, there is no way to make it visible again without > using the ReceiptHandle. > 2:01 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1726777889954539> > David Green > > So one idea is to collect the receipt IDs and then delete the messages in > the checkpoint completion event. But since this might not happen we want to > store state to eventually cleanup without storing every message ID. Usually > we use some offset/sequence number. > > This approach could work for SQS FIFO queues if you stored the > SequenceNumber by message group ID, but each message group with in-flight > messages would have to wait for those messages to become visible again. > Yesterday > Saurabh SinghSaurabh Singh 6:04 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727094880363379> > Thanks @Danny Cranmer <https://apache-flink.slack.com/team/U03JD1ZFQ3D> @David > Green <https://apache-flink.slack.com/team/U07NKKNBG5P> for the valuable > insights. We have reviewed the conversation and are providing a summary > here. We will update the FLIP accordingly once we agree on the same. > > - *Exactly Once Semantics for Standard Queues* > > - Unfortunately, achieving exactly once semantics is not feasible for > Standard Queues due to the following reasons: > - There can be duplicate message deliveries with SQS standard > queues. > - A successful delete call does not guarantee the absolute deletion > of the messages. > *Therefore, we can only provide at-least-once delivery for > Standard Queues.*• *Exactly Once Semantics for FIFO Queues* > - Exactly once semantics is achievable for FIFO Queues. To accomplish > this, we need to keep track of seqNum (which is strictly increasing for > messages) and groupId. By using the seqNum, we can achieve exactly once > semantics.• *Manipulation of Invisible Messages* > - *For Standard Queues:* > - Since we are providing at-least-once delivery, there is no need > to manipulate invisible messages. After their visibility timeout expires, > the messages are consumed again, and user code needs to handle these corner > scenarios. > -* For FIFO Queues:* > - The challenge is that processing is halted if the previous > invisible messages are not deleted. Therefore, we need to additionally keep > track of ReceiptHandle. In case messages remain in an invisible state for > an extended period, we need to mark them visible again. This can be quite > tricky, especially with multiple groupIds. I believe we need to study > this in detail and design the mechanism in the extension flip.Let us know > if this is good and we can proceed? > ------------------------------ > New > David GreenDavid Green 10:06 PM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727109383466049> > LGTM > Today > Arvid HeiseArvid Heise 11:19 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727156995566719> > Just to clarify: > > - EOS with FIFO is out of scope of the FLIP now and there will be a > follow-up FLIP? > > 11:22 > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157161772559> > Arvid Heise > Imho (and this view is not fully shared by all Flink committers) you don't > strictly need FLIPs for connectors, especially if you just submit > extensions. However, you can see how beneficial it is to discuss the ideas > with a broader audience. > Having a FLIP for the start of a connector is definitively something that > I'd like to see but we would be driven mad if there is FLIP for each and > every improvement. It's a different story if you change existing behavior. > Saurabh SinghSaurabh Singh 11:33 AM > <https://apache-flink.slack.com/archives/C07MRSRNWBU/p1727157813737819> > Agree @Arvid Heise <https://apache-flink.slack.com/team/U07BQT3AA1W>. We > will just extend the existing FLIP to document the behaviour FIFO queues. > This discussion has been very valuable for us to extend our knowledge and > understanding of the concepts and concluding to the right behaviour for the > connector. Thanks for helping us out here. > We will update the FLIP and discussion thread with the new details. > > > > > > ------------------------------------------------------------------------------------------- > > > > > On Tue, Sep 17, 2024 at 12:26 PM Arvid Heise <ar...@apache.org> wrote: > >> Hi Saurabh, >> >> thank you very much for taking the time to explain SQS features to me. >> It's a cool concept that I haven't seen before. >> >> While your presented approach already works for Flink EOS, I still need >> to poke a bit because I think the usability may be improved. >> >> Some background information on how sinks usually work: >> * On checkpoint, we store the committables (=transactions or file >> locations). >> * On notifyCheckpointComplete, we commit them all or fail on persisting >> errors. >> * On recovery, we retry all stored committables. >> * On recovery, we cancel all opened transactions not belonging to the >> recovered state. >> >> Now back to SQS, can we speed up recovery? Let's go over my thoughts: >> * I'm assuming you need to set the timeout to a value that corresponds to >> Flink checkpoint duration. So I often see 1 minute checkpointing interval >> and a checkpoint taking some seconds, so we would choose a SQS timeout of 2 >> minutes (we'd probably set it higher in realistic settings). >> * Before failure, Flink tries to read message1 and fails, there are more >> messages afterwards (message2, ...) >> * In standard queues, Flink restarts and will not see message1 until the >> timeout happens. It can immediately start processing message2 and so on. >> After 2 minutes, it will finally see message1 and consume it. >> * In FIFO queues, Flink will not see any message until the timeout >> happens. So effectively, Flink will not process anything for 2 minutes. >> That means that recovery is effectively always as high as the SQS timeout. >> >> I'm curious if there is a way to manually timeout the messages for faster >> recovery? I saw that you can decrease the timeout of specific messages, >> which could be used to set the timeout to 0 on restart of message1. If that >> works, we could do similar things to the sink. On restart, we immediately >> try to delete the messages that are part of the checkpoint and set the >> timeout to 0 for messages that were read after the checkpoint. If it's >> somehow possible to enumerate all messages before and past the respective >> checkpoint, we would not depend on the SQS timeout of the consumer for >> correctness and recovery time and could set it to its max=12h. Else we >> would have the users to trade off correctness (timeout too low) vs recovery >> time (timeout too high). >> >> What are you planning to store as the source state? >> > *record2 is still captured as part of the state, so it gets deleted >> from the queue during the next checkpoint.* >> Are you planning to store the message ids of all to-be-deleted messages >> in the state? That certainly works for low volume sources but ideally, you >> would just store the highest message id and enumerate the other message ids >> from it. >> >> Could you please update the FLIP to include the provided information so >> that other readers can quickly understand the visibility timeout and the >> state management? (external links are fine of course but please provide a >> brief summary in case the link becomes dead at some point) You can of >> course defer it until we are completely aligned. >> >> Best, >> >> Arvid >> >> On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh <saurabhsingh9...@gmail.com> >> wrote: >> >>> Hi Arvid, >>> >>> Thanks a lot for your review. >>> >>> 1. Exactly once: >>> >>> >>> >>> *When using an SQS source, we leverage two key properties of SQS:1. >>> Message Retention: This property ensures that messages are retained in the >>> queue for a specified period, allowing consumers enough time to process >>> them. Post these messages are deleted.2. Visibility Timeout [1] : This >>> property prevents other consumers from receiving and processing the same >>> message while it's being processed by the current consumer.* >>> >>> *Regarding the scenario described below:* >>> >>> * Read record2, emit record2, record2 is written into a Kafka >>> transaction2 >>> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2 >>> to be committed >>> * NotifyCheckpointCompleted2 is lost for some reason or happens after >>> next >>> failure >>> * Some failure happens, Flink is restarted with the current system state: >>> SQS contains record2, record3, Kafka contains record1 and pending >>> record2. >>> On restart, sink will commit transactions2 recovered from the state. No >>> SQS >>> contains record2, record3 and Kafka contains record1, record2. >>> * Read record2, emit record2, record2 is written into a Kafka >>> transaction3 >>> * Eventually, we end up with two record2 in the sink. >>> >>> *In this scenario, we set the Message Visibility Timeout to be multiples >>> of the checkpoint duration and ensure it is longer than the recovery >>> duration. This way, when recovery occurs, record2 remains invisible and is >>> not read again, preventing duplication. record2 is still captured as part >>> of the state, so it gets deleted from the queue during the next checkpoint.* >>> >>> *Message Retention Time: We typically set this to a sufficiently long >>> duration. This is beneficial for recovering from extended outages and >>> backfilling data. If a message is not deleted, it will be read again once >>> it becomes visible after the visibility timeout expires.* >>> >>> *This is our current understanding for achieving exactly-once >>> processing. Do you see any challenges with this approach? If so, we would >>> greatly appreciate your feedback.* >>> >>> 2. Deleting messages: >>> *Yes, It's quite natural for consumers to delete messages for the >>> reasons mentioned above, such as recovery from outages and backfilling >>> data.* >>> >>> 3. Regarding the Source parallelism: >>> *ACK. We'll document this behavior accordingly.* >>> >>> 4. Multiple queue support: >>> *Yes, as an extension to this FLIP, we will support this functionality >>> in the future. However, in the meantime, this can still be achieved by >>> creating multiple sources with specific queues.* >>> >>> >>> Please review and let us know your feedback on this. >>> >>> [1] >>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html >>> >>> >>> Thanks >>> Saurabh & Abhi >>> >>> >>> On Thu, Sep 12, 2024 at 2:23 PM Arvid Heise <ar...@apache.org> wrote: >>> >>>> <I accidentally sent this message Saurabh directly instead to the >>>> mailing >>>> list, please disregard that first message and only respond to this one> >>>> >>>> Hi Saurabh, >>>> >>>> thank you for addressing the points. I still have concerns around EOS >>>> and a >>>> few remarks. >>>> >>>> 1. Exactly once: >>>> The checkpoint subsuming contract typically only applies to sinks, so >>>> it's >>>> unusual to be used in sources. Let's quickly look at an example so we >>>> are >>>> all on the same page. >>>> >>>> Assume we read messages record1, record2, record3 all arriving a few >>>> minutes apart such there are checkpoints in-between. For simplicity >>>> let's >>>> assume one checkpoint between records and call them checkpoint1 after 1, >>>> checkpoint2 after 2 and so on. The data is simply stored in some EOS >>>> sink >>>> (e.g. Kafka). The corresponding calls of NotifyCheckpointCompleted are >>>> enumerated with NotifyCheckpointCompleted1, NotifyCheckpointCompleted2, >>>> ... >>>> >>>> Then your source sees the following on failure: >>>> * Read record1, emit record1, record1 is written into a Kafka >>>> transaction1 >>>> * Checkpoint1 happens, state is checkpointed, sink remembers >>>> transaction1 >>>> to be committed >>>> * NotifyCheckpointCompleted1 happens, sink commits transaction1, so >>>> record1 >>>> is visible to downstream consumer >>>> * During NotifyCheckpointCompleted1, SQS source also deletes record1 >>>> * Some failure happens, Flink is restarted with the current system >>>> state: >>>> SQS contains record2, record3, Kafka contains record1. Since >>>> transaction1 >>>> is already committed, the sink ignores it. So far so good. >>>> >>>> * Read record2, emit record2, record2 is written into a Kafka >>>> transaction2 >>>> * Checkpoint2 happens, state is checkpointed, sink remembers >>>> transaction2 >>>> to be committed >>>> * NotifyCheckpointCompleted2 is lost for some reason or happens after >>>> next >>>> failure >>>> * Some failure happens, Flink is restarted with the current system >>>> state: >>>> SQS contains record2, record3, Kafka contains record1 and pending >>>> record2. >>>> On restart, sink will commit transactions2 recovered from the state. No >>>> SQS >>>> contains record2, record3 and Kafka contains record1, record2. >>>> * Read record2, emit record2, record2 is written into a Kafka >>>> transaction3 >>>> * Eventually, we end up with two record2 in the sink. >>>> >>>> So you see that with your proposal, we get duplicate records easily on >>>> failure, which is not exactly once. It's not enough to delete the >>>> messages >>>> eventually with NotifyCheckpointCompleted3. >>>> >>>> How it usually works: >>>> * The checkpoint state of the source contains some kind of offset, which >>>> would point to record3 in our example. >>>> * On recovery, seek the record corresponding to the offset and start >>>> reading from there. >>>> >>>> If the source system doesn't support seeking, then the usual way is to >>>> replay a larger section but discard all records with a smaller offset. >>>> If >>>> there is no strict ordering, a last resort is to track all message ids >>>> on >>>> low volume sources and discard duplicates. You could also start with >>>> just >>>> offering AT LEAST ONCE and let the user deduplicate. We should just be >>>> wary >>>> to not call something exactly once that isn't. >>>> >>>> 2. Deleting messages: >>>> It's rather uncommon for a connector to explicitly delete messages. >>>> Usually, they fall off retention at some point. I'd try to decouple that >>>> from EOS and give users an option to fully consume or not. Unless of >>>> course, it's very natural for SQS consumers to delete the messages; I >>>> can't >>>> assess that as I have never used SQS. >>>> >>>> 3. Regarding the parallelism: >>>> > *We have reviewed the DynamicParallelismInference and noticed that it >>>> is >>>> currently limited to batching jobs only [2]. * >>>> You are correct it cannot be used and thus we cannot enforce >>>> parallelism of >>>> 1. I got excited when I saw that interface and didn't read properly. >>>> >>>> Limiting the parallelism will help to reduce the resources. If you spawn >>>> more than 1 source subtask, it should get immediately terminated because >>>> there is nothing to do (don't forget to emit SourceReaderFinishedEvent >>>> for >>>> all idle subtasks). If the pipeline is embarrassing parallel, all >>>> downstream subtasks will also be closed. >>>> But there is nothing that you can do except document it to set the >>>> parallelism of the source to 1. A higher parallelism is only usable >>>> with a >>>> shuffle then. >>>> >>>> Alternatively, you could think about allowing the source to subscribe to >>>> multiple queues and distribute them to the different subtasks. >>>> >>>> Best, >>>> >>>> Arvid >>>> >>>> On Tue, Sep 10, 2024 at 4:50 PM Saurabh Singh < >>>> saurabhsingh9...@gmail.com> >>>> wrote: >>>> >>>> > Hi Danny, Arvid, >>>> > >>>> > Thank you so much for reviewing the FLIP. We apologize for the delayed >>>> > response; we've been quite busy over the past week. >>>> > >>>> > >>>> > - // .setFailOnError(false) >>>> > A general callout that this is not ideal, I know we use it >>>> elsewhere >>>> > but we should consider some pluggable error handling at some >>>> point. Not as >>>> > part of this FLIP, but if we can avoid adding this flag here, then >>>> I would. >>>> > >>>> > *Acknowledged. We will remove this flag and address it separately to >>>> > enhance our error handling design.* >>>> > >>>> > - // The code registers a checkpoint completion notification >>>> callback >>>> > via *notifyCheckpointComplete* >>>> > Flink does not guarantee that notifications are actually invoked >>>> and >>>> > successful [1]. Therefore there is a chance this method will not >>>> run, and >>>> > hence we could violate exactly once semantics here. Suggest that we >>>> > instead/additionally track the SQS read messages within the >>>> checkpoint >>>> > state, we can evict from state once deleted, and possibly prune on >>>> startup >>>> > or skip duplicates. Thoughts? >>>> > >>>> > *Yes, our implementation will comply with the checkpoint subsuming >>>> > contract mentioned in [1]. To achieve this, we will track newly read >>>> > messages <message ids> in a structure associated with a checkpoint >>>> ID. In >>>> > cases where checkpoint notifications are missed, we will review all >>>> > messages in the state with checkpoint IDs less than the received one, >>>> > delete those messages, and remove them from the state.* >>>> > >>>> > *In addition, for optimal functionality, we depend on the visibility >>>> > timeout configured for the queue. If read messages are not deleted >>>> before >>>> > the visibility timeout expires due to any transient issues, they will >>>> > reappear. We will offer guidance to users of this connector on >>>> adjusting >>>> > the visibility timeout value to align with specific checkpoint >>>> durations >>>> > and avoid duplicate processing.* >>>> > >>>> > - Since we only have 1 split, we should also limit the parallelism >>>> of >>>> > the source accordingly. We could exploit the >>>> DynamicParallelismInference to >>>> > effectively limit it to 1 unless the user explicitly overwrites. >>>> > >>>> > *We have reviewed the DynamicParallelismInference and noticed that it >>>> is >>>> > currently limited to batching jobs only [2]. We would appreciate it >>>> if you >>>> > could help us understand the rationale behind restricting the >>>> parallelism >>>> > for the source to 1. Are there any benefits to this limitation, or >>>> > potential issues if we do not enforce it?* >>>> > >>>> > - If I haven't overlooked something obvious, then your exactly-once >>>> > strategy will not work. There is unfortunately no guarantee that >>>> > notifyCheckpointComplete is called at all before a failure >>>> happens. So >>>> > you very likely get duplicate messages. Scanning the SQS >>>> > documentation, I saw that you can read the SequenceNumber of the >>>> message. >>>> > If you also store the latest number in the checkpoint state of the >>>> split, >>>> > then you can discard all messages during recovery that are >>>> smaller. But I >>>> > have never used SQS, so just take it as an inspiration. Also note >>>> that you >>>> > didn't sketch how you delete messages from the queue. It's very >>>> important >>>> > to only delete those messages that are part of the successful >>>> checkpoint. >>>> > So you can't use PurgeQueue. >>>> > >>>> > *As mentioned earlier in the second point, we have already discussed >>>> the >>>> > mechanism. Please let us know if you have any concerns.* >>>> > >>>> > - I wonder if you need to use ReceiveRequestAttemptId. It looks >>>> like >>>> > it may be important for retries. >>>> > >>>> > >>>> > >>>> > *Since we use the AWS SDK library to establish connections and >>>> perform SQS >>>> > operations. This library already ensures retries for any transient >>>> issues, >>>> > with a default of three retries [3]. Additionally, AWS provides >>>> predefined >>>> > schemes to select from when creating the client.* >>>> > Please review and let us know your feedback on this. >>>> > >>>> > >>>> > *[1] * >>>> > >>>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html >>>> > *[2] * >>>> > >>>> https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java#L29 >>>> > *[3]* >>>> > >>>> https://github.com/aws/aws-sdk-java/blob/61d73631fac8535ad70666bbce9e70a1d2cea2ca/aws-java-sdk-core/src/main/java/com/amazonaws/retry/PredefinedRetryPolicies.java#L41 >>>> > >>>> > >>>> > >>>> > Regards >>>> > Saurabh & Abhi >>>> > >>>> > >>>> > >>>> > On Tue, Sep 3, 2024 at 6:40 PM Arvid Heise <ar...@apache.org> wrote: >>>> > >>>> >> Sorry for being late to the party. I saw your call to vote and >>>> looked at >>>> >> the FLIP. >>>> >> >>>> >> First, most of the design is looking really good and it will be good >>>> to >>>> >> have another connector integrated into the AWS ecosystem. A couple of >>>> >> questions/remarks: >>>> >> 1) Since we only have 1 split, we should also limit the parallelism >>>> of the >>>> >> source accordingly. We could exploit the DynamicParallelismInference >>>> to >>>> >> effectively limit it to 1 unless the user explicitly overwrites. >>>> >> 2) If I haven't overlooked something obvious, then your exactly-once >>>> >> strategy will not work. There is unfortunately no guarantee >>>> >> that notifyCheckpointComplete is called at all before a failure >>>> happens. >>>> >> So >>>> >> you very likely get duplicate messages. >>>> >> Scanning the SQS documentation, I saw that you can read the >>>> SequenceNumber >>>> >> of the message. If you also store the latest number in the checkpoint >>>> >> state >>>> >> of the split, then you can discard all messages during recovery that >>>> are >>>> >> smaller. But I have never used SQS, so just take it as an >>>> inspiration. >>>> >> Also note that you didn't sketch how you delete messages from the >>>> queue. >>>> >> It's very important to only delete those messages that are part of >>>> the >>>> >> successful checkpoint. So you can't use PurgeQueue. >>>> >> 3) I wonder if you need to use ReceiveRequestAttemptId. It looks >>>> like it >>>> >> may be important for retries. >>>> >> >>>> >> Best, >>>> >> >>>> >> Arvid >>>> >> >>>> >> On Tue, Aug 20, 2024 at 11:24 AM Ahmed Hamdy <hamdy10...@gmail.com> >>>> >> wrote: >>>> >> >>>> >> > Hi Abhisagar and Saurabh >>>> >> > I have created the FLIP page and assigned it FLIP-477[1]. Feel >>>> free to >>>> >> > resume with the next steps. >>>> >> > >>>> >> > 1- >>>> >> > >>>> >> > >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-+477+Amazon+SQS+Source+Connector >>>> >> > Best Regards >>>> >> > Ahmed Hamdy >>>> >> > >>>> >> > >>>> >> > On Tue, 20 Aug 2024 at 06:05, Abhisagar Khatri < >>>> >> > khatri.abhisaga...@gmail.com> >>>> >> > wrote: >>>> >> > >>>> >> > > Hi Flink Devs, >>>> >> > > >>>> >> > > Gentle Reminder for the request. We'd like to ask the >>>> PMC/Committers >>>> >> to >>>> >> > > transfer the content from the Amazon SQS Source Connector Google >>>> Doc >>>> >> [1] >>>> >> > > and assign a FLIP Number for us, which we can use further for >>>> voting. >>>> >> > > We are following the procedure outlined on the Flink Improvement >>>> >> Proposal >>>> >> > > Confluence page [2]. >>>> >> > > >>>> >> > > [1] >>>> >> > > >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing >>>> >> > > [2] >>>> >> > > >>>> >> > >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process >>>> >> > > >>>> >> > > Regards, >>>> >> > > Abhi & Saurabh >>>> >> > > >>>> >> > > >>>> >> > > On Tue, Aug 13, 2024 at 12:50 PM Saurabh Singh < >>>> >> > saurabhsingh9...@gmail.com> >>>> >> > > wrote: >>>> >> > > >>>> >> > >> Hi Flink Devs, >>>> >> > >> >>>> >> > >> Thanks for all the feedback flink devs. >>>> >> > >> >>>> >> > >> Following the procedure outlined on the Flink Improvement >>>> Proposal >>>> >> > >> Confluence page [1], we kindly ask the PMC/Committers to >>>> transfer the >>>> >> > >> content from the Amazon SQS Source Connector Google Doc [2] and >>>> >> assign a >>>> >> > >> FLIP Number for us, which we will use for voting. >>>> >> > >> >>>> >> > >> [1] >>>> >> > >> >>>> >> > >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process >>>> >> > >> [2] >>>> >> > >> >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing >>>> >> > >> >>>> >> > >> Regards >>>> >> > >> Saurabh & Abhi >>>> >> > >> >>>> >> > >> >>>> >> > >> On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh < >>>> >> > saurabhsingh9...@gmail.com> >>>> >> > >> wrote: >>>> >> > >> >>>> >> > >>> Hi Ahmed, >>>> >> > >>> >>>> >> > >>> Yes, you're correct. Currently, we're utilizing the "record >>>> >> emitter" to >>>> >> > >>> send messages into the queue for deletion. However, for the >>>> actual >>>> >> > deletion >>>> >> > >>> process, which is dependent on the checkpoints, we've been >>>> using the >>>> >> > source >>>> >> > >>> reader class because it allows us to override the >>>> >> > notifyCheckpointComplete >>>> >> > >>> method. >>>> >> > >>> >>>> >> > >>> Regards >>>> >> > >>> Saurabh & Abhi >>>> >> > >>> >>>> >> > >>> On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy < >>>> hamdy10...@gmail.com> >>>> >> > wrote: >>>> >> > >>> >>>> >> > >>>> Hi Saurabh >>>> >> > >>>> Thanks for addressing, I see the FLIP is in much better state. >>>> >> > >>>> Could we specify where we queue messages for deletion, In my >>>> >> opinion >>>> >> > >>>> the record emitter is a good place for that where we delete >>>> >> messages >>>> >> > that >>>> >> > >>>> are forwarded to the next operator. >>>> >> > >>>> Other than that I don't have further comments. >>>> >> > >>>> Thanks again for the effort. >>>> >> > >>>> >>>> >> > >>>> Best Regards >>>> >> > >>>> Ahmed Hamdy >>>> >> > >>>> >>>> >> > >>>> >>>> >> > >>>> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh < >>>> >> > saurabhsingh9...@gmail.com> >>>> >> > >>>> wrote: >>>> >> > >>>> >>>> >> > >>>>> Hi Ahmed, >>>> >> > >>>>> >>>> >> > >>>>> Thank you very much for the detailed, valuable review. >>>> Please find >>>> >> > our >>>> >> > >>>>> responses below: >>>> >> > >>>>> >>>> >> > >>>>> >>>> >> > >>>>> - In the FLIP you mention the split is going to be 1 sqs >>>> Queue, >>>> >> > >>>>> does this mean we would support reading from multiple >>>> queues? >>>> >> > This is also >>>> >> > >>>>> not clear in the implementation of `addSplitsBack` >>>> whether we >>>> >> are >>>> >> > planning >>>> >> > >>>>> to support multiple sqs topics or not. >>>> >> > >>>>> >>>> >> > >>>>> *Our current implementation assumes that each source reads >>>> from a >>>> >> > >>>>> single SQS queue. If you need to read from multiple SQS >>>> queues, >>>> >> you >>>> >> > can >>>> >> > >>>>> define multiple sources accordingly. We believe this >>>> approach is >>>> >> > clearer >>>> >> > >>>>> and more organized compared to having a single source switch >>>> >> between >>>> >> > >>>>> multiple queues. This design choice is based on weighing the >>>> >> > benefits, but >>>> >> > >>>>> we can support multiple queues per source if the need >>>> arises.* >>>> >> > >>>>> >>>> >> > >>>>> - Regarding Client creation, there has been some effort >>>> in the >>>> >> > >>>>> common `aws-util` module like createAwsSyncClient, we >>>> should >>>> >> > reuse that for >>>> >> > >>>>> `SqsClient` creation. >>>> >> > >>>>> >>>> >> > >>>>> *Thank you for bringing this to our attention. Yes, we >>>> will >>>> >> > >>>>> utilize the existing createClient methods available in the >>>> >> > libraries. Our >>>> >> > >>>>> goal is to avoid any code duplication on our end.* >>>> >> > >>>>> >>>> >> > >>>>> >>>> >> > >>>>> - On the same point for clients, Is there a reason the >>>> FLIP >>>> >> > >>>>> suggests async clients? sync clients have proven more >>>> stable >>>> >> and >>>> >> > the source >>>> >> > >>>>> threading model already guarantees no blocking by sync >>>> clients. >>>> >> > >>>>> >>>> >> > >>>>> *We were not aware of this, and we have been using async >>>> clients >>>> >> for >>>> >> > >>>>> our in-house use cases. However, since we already have sync >>>> >> clients >>>> >> > in the >>>> >> > >>>>> aws-util that ensure no blocking, we are in a good position. >>>> We >>>> >> will >>>> >> > use >>>> >> > >>>>> these sync clients during our development and testing >>>> efforts, and >>>> >> > we will >>>> >> > >>>>> share the results and keep the community updated.* >>>> >> > >>>>> >>>> >> > >>>>> - On mentioning threading, the FLIP doesn’t mention the >>>> fetcher >>>> >> > >>>>> manager. Is it going to be `SingleThreadFetcherManager`? >>>> Would >>>> >> it >>>> >> > be better >>>> >> > >>>>> to make the source reader extend the >>>> >> > SingleThreadedMultiplexReaderBase or >>>> >> > >>>>> are we going to implement a more simple version? >>>> >> > >>>>> >>>> >> > >>>>> *Yes, we are considering implementing >>>> >> > >>>>> SingleThreadMultiplexSourceReaderBase for the Reader. We have >>>> >> > included the >>>> >> > >>>>> implementation snippet in the FLIP for reference.* >>>> >> > >>>>> >>>> >> > >>>>> - The FLIP doesn’t mention schema deserialization or the >>>> >> > >>>>> recordEmitter implementation, Are we going to use >>>> >> > `deserializationSchema` >>>> >> > >>>>> or some sort of string to element converter? It is also >>>> not >>>> >> clear >>>> >> > form the >>>> >> > >>>>> builder example provided? >>>> >> > >>>>> >>>> >> > >>>>> *Yes, we plan to use the deserializationSchema and >>>> recordEmitter >>>> >> > >>>>> implementations. We have included sample code for these in >>>> the >>>> >> FLIP >>>> >> > for >>>> >> > >>>>> reference.* >>>> >> > >>>>> >>>> >> > >>>>> - Are the values mentioned in getSqsClientProperties >>>> >> recommended >>>> >> > >>>>> defaults? If so we should highlight that. >>>> >> > >>>>> >>>> >> > >>>>> *The defaults are not decided. These are just sample >>>> snapshots for >>>> >> > >>>>> example.* >>>> >> > >>>>> >>>> >> > >>>>> - Most importantly I am a bit skeptical regarding >>>> enforcing >>>> >> > >>>>> exactly-once semantics with side effects especially with >>>> >> > dependency on >>>> >> > >>>>> checkpointing configuration, could we add flags to >>>> disable and >>>> >> > disable by >>>> >> > >>>>> default if the checkpointing is not enabled? >>>> >> > >>>>> >>>> >> > >>>>> *During our initial design phase, we intended to enforce >>>> >> exactly-once >>>> >> > >>>>> semantics via checkpoints. However, you raise a valid point, >>>> and >>>> >> we >>>> >> > will >>>> >> > >>>>> make this a configurable feature for users. They can choose >>>> to >>>> >> > disable >>>> >> > >>>>> exactly-once semantics, accepting some duplicate processing >>>> >> > (at-least-once) >>>> >> > >>>>> as a trade-off. We have updated the FLIP to include support >>>> for >>>> >> this >>>> >> > >>>>> feature.* >>>> >> > >>>>> >>>> >> > >>>>> - I am not 100% convinced we should block FLIP itself on >>>> the >>>> >> > >>>>> FLIP-438 implementation but I echo the fact that there >>>> might be >>>> >> > some >>>> >> > >>>>> reusable code between the 2 submodules we should make use >>>> of. >>>> >> > >>>>> >>>> >> > >>>>> *Yes we echo the same. We fully understand the concern >>>> and are >>>> >> > >>>>> closely examining the SQS Sink implementation. We will >>>> ensure >>>> >> > there is no >>>> >> > >>>>> duplication of work or submodules. If any issues arise, >>>> we will >>>> >> > address >>>> >> > >>>>> them promptly.* >>>> >> > >>>>> >>>> >> > >>>>> >>>> >> > >>>>> Thank you for your valuable feedback on the FLIP. Your input >>>> is >>>> >> > >>>>> helping us refine and improve it significantly. >>>> >> > >>>>> >>>> >> > >>>>> [Main Proposal doc] - >>>> >> > >>>>> >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl >>>> >> > >>>>> >>>> >> > >>>>> Regards >>>> >> > >>>>> Saurabh & Abhi >>>> >> > >>>>> >>>> >> > >>>>> >>>> >> > >>>>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy < >>>> hamdy10...@gmail.com >>>> >> > >>>> >> > >>>>> wrote: >>>> >> > >>>>> >>>> >> > >>>>>> Hi Saurabh >>>> >> > >>>>>> I think this is going to be a valuable addition which is >>>> needed. >>>> >> > >>>>>> I have a couple of comments >>>> >> > >>>>>> - In the FLIP you mention the split is going to be 1 sqs >>>> Queue, >>>> >> does >>>> >> > >>>>>> this >>>> >> > >>>>>> mean we would support reading from multiple queues? The >>>> builder >>>> >> > >>>>>> example >>>> >> > >>>>>> seems to support a single queue >>>> >> > >>>>>> SqsSource.<T>builder.setSqsUrl(" >>>> >> > >>>>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test") >>>> >> > >>>>>> - This is also not clear in the implementation of >>>> `addSplitsBack` >>>> >> > >>>>>> whether >>>> >> > >>>>>> we are planning to support multiple sqs topics or not. >>>> >> > >>>>>> - Regarding Client creation, there has been some effort in >>>> the >>>> >> > common >>>> >> > >>>>>> `aws-util` module like createAwsSyncClient , we should >>>> reuse >>>> >> that >>>> >> > for >>>> >> > >>>>>> `SqsClient` creation. >>>> >> > >>>>>> - On the same point for clients, Is there a reason the FLIP >>>> >> suggests >>>> >> > >>>>>> async >>>> >> > >>>>>> clients? sync clients have proven more stable and the source >>>> >> > threading >>>> >> > >>>>>> model already guarantees no blocking by sync clients. >>>> >> > >>>>>> - On mentioning threading, the FLIP doesn't mention the >>>> fetcher >>>> >> > >>>>>> manager. Is >>>> >> > >>>>>> it going to be `SingleThreadFetcherManager`? Would it be >>>> better >>>> >> to >>>> >> > >>>>>> make the >>>> >> > >>>>>> source reader extend the SingleThreadedMultiplexReaderBase >>>> or >>>> >> are we >>>> >> > >>>>>> going >>>> >> > >>>>>> to implement a more simple version? >>>> >> > >>>>>> - The FLIP doesn't mention schema deserialization or the >>>> >> > recordEmitter >>>> >> > >>>>>> implementation, Are we going to use `deserializationSchema` >>>> or >>>> >> some >>>> >> > >>>>>> sort of >>>> >> > >>>>>> string to element converter? It is also not clear form the >>>> >> builder >>>> >> > >>>>>> example >>>> >> > >>>>>> provided? >>>> >> > >>>>>> - Are the values mentioned in getSqsClientProperties >>>> recommended >>>> >> > >>>>>> defaults? >>>> >> > >>>>>> If so we should highlight that >>>> >> > >>>>>> - Most importantly I am a bit skeptical regarding enforcing >>>> >> > >>>>>> exactly-once >>>> >> > >>>>>> semantics with side effects especially with dependency on >>>> >> > >>>>>> checkpointing >>>> >> > >>>>>> configuration, could we add flags to disable and disable by >>>> >> default >>>> >> > >>>>>> if the >>>> >> > >>>>>> checkpointing is not enabled? >>>> >> > >>>>>> >>>> >> > >>>>>> I am not 100% convinced we should block FLIP itself on the >>>> >> FLIP-438 >>>> >> > >>>>>> implementation but I echo the fact that there might be some >>>> >> reusable >>>> >> > >>>>>> code >>>> >> > >>>>>> between the 2 submodules we should make use of. >>>> >> > >>>>>> >>>> >> > >>>>>> Best Regards >>>> >> > >>>>>> Ahmed Hamdy >>>> >> > >>>>>> >>>> >> > >>>>>> >>>> >> > >>>>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh < >>>> >> > >>>>>> saurabhsingh9...@gmail.com> >>>> >> > >>>>>> wrote: >>>> >> > >>>>>> >>>> >> > >>>>>> > Hi Li Wang, >>>> >> > >>>>>> > >>>> >> > >>>>>> > Thanks for the review and appreciate your feedback. >>>> >> > >>>>>> > I completely understand your concern and agree with it. >>>> Our >>>> >> goal >>>> >> > is >>>> >> > >>>>>> to >>>> >> > >>>>>> > provide users of connectors with a consistent and coherent >>>> >> > >>>>>> ecosystem, free >>>> >> > >>>>>> > of any issues. >>>> >> > >>>>>> > To ensure that, we are closely monitoring/reviewing the >>>> SQS >>>> >> Sink >>>> >> > >>>>>> > implementation work by Dhingra. Our development work will >>>> >> commence >>>> >> > >>>>>> once the >>>> >> > >>>>>> > AWS Sink is near completion or completed. This approach >>>> ensures >>>> >> > >>>>>> that we >>>> >> > >>>>>> > take in the new learnings, do not duplicate any core >>>> modules >>>> >> and >>>> >> > >>>>>> allow us >>>> >> > >>>>>> > to save valuable time. >>>> >> > >>>>>> > >>>> >> > >>>>>> > In the meantime, we would like to keep the discussion and >>>> >> process >>>> >> > >>>>>> active on >>>> >> > >>>>>> > this FLIP. Gaining valuable community feedback (which is >>>> >> helping >>>> >> > >>>>>> us) will >>>> >> > >>>>>> > help us address any potential gaps in the source connector >>>> >> design >>>> >> > >>>>>> and >>>> >> > >>>>>> > finalize it. Behind the scenes, we are already designing >>>> and >>>> >> > >>>>>> pre-planning >>>> >> > >>>>>> > our development work to adhere to feedback/best practices/ >>>> >> faster >>>> >> > >>>>>> delivery >>>> >> > >>>>>> > when we implement this FLIP. >>>> >> > >>>>>> > Please share your thoughts on this. >>>> >> > >>>>>> > >>>> >> > >>>>>> > Regards >>>> >> > >>>>>> > Saurabh >>>> >> > >>>>>> > >>>> >> > >>>>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang < >>>> >> liwang505...@gmail.com> >>>> >> > >>>>>> wrote: >>>> >> > >>>>>> > >>>> >> > >>>>>> > > Hi Saurabh, >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > Thanks for the FLIP. Given the ongoing effort to >>>> implement >>>> >> the >>>> >> > >>>>>> SQS sink >>>> >> > >>>>>> > > connector ( >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector >>>> >> > >>>>>> > > ), >>>> >> > >>>>>> > > it is important to consider how the SQS source connector >>>> >> > supports >>>> >> > >>>>>> this >>>> >> > >>>>>> > > development, ensuring a unified framework for AWS >>>> integration >>>> >> > >>>>>> that avoids >>>> >> > >>>>>> > > capacity overlap and maximizes synchronization. Since >>>> the >>>> >> > >>>>>> implementation >>>> >> > >>>>>> > by >>>> >> > >>>>>> > > Dhingra is still WIP, I would suggest taking that into >>>> >> account >>>> >> > >>>>>> and keep >>>> >> > >>>>>> > > this FLIP as an extension of that FLIP which could be >>>> taken >>>> >> up >>>> >> > >>>>>> once the >>>> >> > >>>>>> > SQS >>>> >> > >>>>>> > > Sink FLIP is fully implemented. If not, this may lead to >>>> >> > >>>>>> doublework in >>>> >> > >>>>>> > > multiple identity-related modules and structure of the >>>> >> overall >>>> >> > SQS >>>> >> > >>>>>> > > connector codebase. What do you think? >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > Thanks >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > On Thursday, July 25, 2024, Saurabh Singh < >>>> >> > >>>>>> saurabhsingh9...@gmail.com> >>>> >> > >>>>>> > > wrote: >>>> >> > >>>>>> > > >>>> >> > >>>>>> > > > Hi Samrat, >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > Thanks for the review and feedback. >>>> >> > >>>>>> > > > We have evaluated all the three points. Please find >>>> the >>>> >> > answers >>>> >> > >>>>>> below: >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > 1. AWS has announced JSON protocol support in SQS >>>> [1]. Can >>>> >> you >>>> >> > >>>>>> shed >>>> >> > >>>>>> > some >>>> >> > >>>>>> > > > light on how different protocols will be supported? >>>> >> > >>>>>> > > > - We will utilize the AWS client library to connect >>>> with >>>> >> the >>>> >> > >>>>>> AWS SQS >>>> >> > >>>>>> > > > Service. Versions beyond 2.21.19 now support JSON, so >>>> >> simply >>>> >> > >>>>>> upgrading >>>> >> > >>>>>> > > the >>>> >> > >>>>>> > > > client library will suffice for the protocol switch. >>>> >> However, >>>> >> > >>>>>> from the >>>> >> > >>>>>> > > > connector's perspective, we do not anticipate any >>>> changes >>>> >> in >>>> >> > our >>>> >> > >>>>>> > > > communication process, as it is handled by the client >>>> >> library. >>>> >> > >>>>>> [4] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > 2. AWS SQS has two types of queues [2]. What are the >>>> >> > >>>>>> implementation >>>> >> > >>>>>> > > detail >>>> >> > >>>>>> > > > differences for the source connector? >>>> >> > >>>>>> > > > - SQS Connector is indifferent to the customer's >>>> choice of >>>> >> > >>>>>> Queue type. >>>> >> > >>>>>> > If >>>> >> > >>>>>> > > > out-of-order messages are a concern, it will be the >>>> >> > >>>>>> responsibility of >>>> >> > >>>>>> > the >>>> >> > >>>>>> > > > application code or main job logic to manage this. >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > 3. Will the SQS source connector implement any kind of >>>> >> > >>>>>> callbacks [3] on >>>> >> > >>>>>> > > > success to offer any kind of guarantee? >>>> >> > >>>>>> > > > - We have proposed deleting SQS messages using the >>>> >> > notification >>>> >> > >>>>>> > provided >>>> >> > >>>>>> > > by >>>> >> > >>>>>> > > > the checkpoint framework on checkpoint completion. >>>> Thus >>>> >> > >>>>>> providing >>>> >> > >>>>>> > exactly >>>> >> > >>>>>> > > > once guarantee.[5] Additionally, when deleting >>>> messages, we >>>> >> > will >>>> >> > >>>>>> > monitor >>>> >> > >>>>>> > > > the API call responses and log any failures, along >>>> with >>>> >> > >>>>>> providing >>>> >> > >>>>>> > > > observability through appropriate metrics. >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > [1] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/ >>>> >> > >>>>>> > > > [2] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html >>>> >> > >>>>>> > > > [3] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html >>>> >> > >>>>>> > > > [4] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started >>>> >> > >>>>>> > > > [5] >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9 >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > [Main Proposal doc] - >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > Please feel free to reach out if you have more >>>> feedback. >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > Regards >>>> >> > >>>>>> > > > Saurabh & Abhi >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb < >>>> >> > >>>>>> decordea...@gmail.com> >>>> >> > >>>>>> > > wrote: >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > > > Hi Saurabh, >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > Thank you for sharing the FLIP for the SQS source >>>> >> connector. >>>> >> > >>>>>> An SQS >>>> >> > >>>>>> > > > source >>>> >> > >>>>>> > > > > connector will be a great addition to the Flink >>>> >> ecosystem, >>>> >> > as >>>> >> > >>>>>> there >>>> >> > >>>>>> > is >>>> >> > >>>>>> > > a >>>> >> > >>>>>> > > > > growing demand for SQS source/sink integration. >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > I have a few queries: >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > 1. AWS has announced JSON protocol support in SQS >>>> [1]. >>>> >> Can >>>> >> > >>>>>> you shed >>>> >> > >>>>>> > > some >>>> >> > >>>>>> > > > > light on how different protocols will be supported? >>>> >> > >>>>>> > > > > 2. AWS SQS has two types of queues [2]. What are the >>>> >> > >>>>>> implementation >>>> >> > >>>>>> > > > detail >>>> >> > >>>>>> > > > > differences for the source connector? >>>> >> > >>>>>> > > > > 3. Will the SQS source connector implement any kind >>>> of >>>> >> > >>>>>> callbacks [3] >>>> >> > >>>>>> > on >>>> >> > >>>>>> > > > > success to offer any kind of guarantee? >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > [1] >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/ >>>> >> > >>>>>> > > > > [2] >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html >>>> >> > >>>>>> > > > > [3] >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > Bests, >>>> >> > >>>>>> > > > > Samrat >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh < >>>> >> > >>>>>> > > > saurabhsingh9...@gmail.com> >>>> >> > >>>>>> > > > > wrote: >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > > > Hi Fink Devs, >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > Our team has been working on migrating various >>>> data >>>> >> > >>>>>> pipelines to >>>> >> > >>>>>> > > Flink >>>> >> > >>>>>> > > > to >>>> >> > >>>>>> > > > > > leverage the benefits of exactly-once processing, >>>> >> > >>>>>> checkpointing, >>>> >> > >>>>>> > and >>>> >> > >>>>>> > > > > > stateful computing. We have several use cases >>>> built >>>> >> around >>>> >> > >>>>>> the AWS >>>> >> > >>>>>> > > SQS >>>> >> > >>>>>> > > > > > Service. For this migration, we have developed an >>>> SQS >>>> >> > Source >>>> >> > >>>>>> > > Connector, >>>> >> > >>>>>> > > > > > which enables us to run both stateless and >>>> stateful >>>> >> > >>>>>> Flink-based >>>> >> > >>>>>> > jobs. >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > We believe that this SQS Source Connector would >>>> be a >>>> >> > >>>>>> valuable >>>> >> > >>>>>> > > addition >>>> >> > >>>>>> > > > to >>>> >> > >>>>>> > > > > > the existing connector set. Therefore, we propose >>>> a >>>> >> FLIP >>>> >> > to >>>> >> > >>>>>> include >>>> >> > >>>>>> > > it. >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > For more information, please refer to the FLIP >>>> >> document. >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>> >> >>>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > > Thanks >>>> >> > >>>>>> > > > > > Saurabh & Abhi >>>> >> > >>>>>> > > > > > >>>> >> > >>>>>> > > > > >>>> >> > >>>>>> > > > >>>> >> > >>>>>> > > >>>> >> > >>>>>> > >>>> >> > >>>>>> >>>> >> > >>>>> >>>> >> > >>>> >> >>>> > >>>> >>>