Hi Saurabh,

thank you very much for taking the time to explain SQS features to me. It's
a cool concept that I haven't seen before.

While your presented approach already works for Flink EOS, I still need to
poke a bit because I think the usability may be improved.

Some background information on how sinks usually work:
* On checkpoint, we store the committables (=transactions or file
locations).
* On notifyCheckpointComplete, we commit them all or fail on persisting
errors.
* On recovery, we retry all stored committables.
* On recovery, we cancel all opened transactions not belonging to the
recovered state.

Now back to SQS, can we speed up recovery? Let's go over my thoughts:
* I'm assuming you need to set the timeout to a value that corresponds to
Flink checkpoint duration. So I often see 1 minute checkpointing interval
and a checkpoint taking some seconds, so we would choose a SQS timeout of 2
minutes (we'd probably set it higher in realistic settings).
* Before failure, Flink tries to read message1 and fails, there are more
messages afterwards (message2, ...)
* In standard queues, Flink restarts and will not see message1 until the
timeout happens. It can immediately start processing message2 and so on.
After 2 minutes, it will finally see message1 and consume it.
* In FIFO queues, Flink will not see any message until the timeout happens.
So effectively, Flink will not process anything for 2 minutes. That means
that recovery is effectively always as high as the SQS timeout.

I'm curious if there is a way to manually timeout the messages for faster
recovery? I saw that you can decrease the timeout of specific messages,
which could be used to set the timeout to 0 on restart of message1. If that
works, we could do similar things to the sink. On restart, we immediately
try to delete the messages that are part of the checkpoint and set the
timeout to 0 for messages that were read after the checkpoint. If it's
somehow possible to enumerate all messages before and past the respective
checkpoint, we would not depend on the SQS timeout of the consumer for
correctness and recovery time and could set it to its max=12h. Else we
would have the users to trade off correctness (timeout too low) vs recovery
time (timeout too high).

What are you planning to store as the source state?
> *record2 is still captured as part of the state, so it gets deleted from
the queue during the next checkpoint.*
Are you planning to store the message ids of all to-be-deleted messages in
the state? That certainly works for low volume sources but ideally, you
would just store the highest message id and enumerate the other message ids
from it.

Could you please update the FLIP to include the provided information so
that other readers can quickly understand the visibility timeout and the
state management? (external links are fine of course but please provide a
brief summary in case the link becomes dead at some point) You can of
course defer it until we are completely aligned.

Best,

Arvid

On Fri, Sep 13, 2024 at 9:32 AM Saurabh Singh <saurabhsingh9...@gmail.com>
wrote:

> Hi Arvid,
>
> Thanks a lot for your review.
>
>    1. Exactly once:
>
>
>
> *When using an SQS source, we leverage two key properties of SQS:1.
> Message Retention: This property ensures that messages are retained in the
> queue for a specified period, allowing consumers enough time to process
> them. Post these messages are deleted.2. Visibility Timeout [1] : This
> property prevents other consumers from receiving and processing the same
> message while it's being processed by the current consumer.*
>
> *Regarding the scenario described below:*
>
> * Read record2, emit record2, record2 is written into a Kafka transaction2
> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
> to be committed
> * NotifyCheckpointCompleted2 is lost for some reason or happens after next
> failure
> * Some failure happens, Flink is restarted with the current system state:
> SQS contains record2, record3, Kafka contains record1 and pending record2.
> On restart, sink will commit transactions2 recovered from the state. No SQS
> contains record2, record3 and Kafka contains record1, record2.
> * Read record2, emit record2, record2 is written into a Kafka transaction3
> * Eventually, we end up with two record2 in the sink.
>
> *In this scenario, we set the Message Visibility Timeout to be multiples
> of the checkpoint duration and ensure it is longer than the recovery
> duration. This way, when recovery occurs, record2 remains invisible and is
> not read again, preventing duplication. record2 is still captured as part
> of the state, so it gets deleted from the queue during the next checkpoint.*
>
> *Message Retention Time: We typically set this to a sufficiently long
> duration. This is beneficial for recovering from extended outages and
> backfilling data. If a message is not deleted, it will be read again once
> it becomes visible after the visibility timeout expires.*
>
> *This is our current understanding for achieving exactly-once processing.
> Do you see any challenges with this approach? If so, we would greatly
> appreciate your feedback.*
>
> 2. Deleting messages:
> *Yes, It's quite natural for consumers to delete messages for the reasons
> mentioned above, such as recovery from outages and backfilling data.*
>
> 3. Regarding the Source parallelism:
> *ACK. We'll document this behavior accordingly.*
>
> 4. Multiple queue support:
> *Yes, as an extension to this FLIP, we will support this functionality in
> the future. However, in the meantime, this can still be achieved by
> creating multiple sources with specific queues.*
>
>
> Please review and let us know your feedback on this.
>
> [1]
> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
>
>
> Thanks
> Saurabh & Abhi
>
>
> On Thu, Sep 12, 2024 at 2:23 PM Arvid Heise <ar...@apache.org> wrote:
>
>> <I accidentally sent this message Saurabh directly instead to the mailing
>> list, please disregard that first message and only respond to this one>
>>
>> Hi Saurabh,
>>
>> thank you for addressing the points. I still have concerns around EOS and
>> a
>> few remarks.
>>
>> 1. Exactly once:
>> The checkpoint subsuming contract typically only applies to sinks, so it's
>> unusual to be used in sources. Let's quickly look at an example so we are
>> all on the same page.
>>
>> Assume we read messages record1, record2, record3 all arriving a few
>> minutes apart such there are checkpoints in-between. For simplicity let's
>> assume one checkpoint between records and call them checkpoint1 after 1,
>> checkpoint2 after 2 and so on. The data is simply stored in some EOS sink
>> (e.g. Kafka). The corresponding calls of NotifyCheckpointCompleted are
>> enumerated with NotifyCheckpointCompleted1, NotifyCheckpointCompleted2,
>> ...
>>
>> Then your source sees the following on failure:
>> * Read record1, emit record1, record1 is written into a Kafka transaction1
>> * Checkpoint1 happens, state is checkpointed, sink remembers transaction1
>> to be committed
>> * NotifyCheckpointCompleted1 happens, sink commits transaction1, so
>> record1
>> is visible to downstream consumer
>> * During NotifyCheckpointCompleted1, SQS source also deletes record1
>> * Some failure happens, Flink is restarted with the current system state:
>> SQS contains record2, record3, Kafka contains record1. Since transaction1
>> is already committed, the sink ignores it. So far so good.
>>
>> * Read record2, emit record2, record2 is written into a Kafka transaction2
>> * Checkpoint2 happens, state is checkpointed, sink remembers transaction2
>> to be committed
>> * NotifyCheckpointCompleted2 is lost for some reason or happens after next
>> failure
>> * Some failure happens, Flink is restarted with the current system state:
>> SQS contains record2, record3, Kafka contains record1 and pending record2.
>> On restart, sink will commit transactions2 recovered from the state. No
>> SQS
>> contains record2, record3 and Kafka contains record1, record2.
>> * Read record2, emit record2, record2 is written into a Kafka transaction3
>> * Eventually, we end up with two record2 in the sink.
>>
>> So you see that with your proposal, we get duplicate records easily on
>> failure, which is not exactly once. It's not enough to delete the messages
>> eventually with NotifyCheckpointCompleted3.
>>
>> How it usually works:
>> * The checkpoint state of the source contains some kind of offset, which
>> would point to record3 in our example.
>> * On recovery, seek the record corresponding to the offset and start
>> reading from there.
>>
>> If the source system doesn't support seeking, then the usual way is to
>> replay a larger section but discard all records with a smaller offset. If
>> there is no strict ordering, a last resort is to track all message ids on
>> low volume sources and discard duplicates. You could also start with just
>> offering AT LEAST ONCE and let the user deduplicate. We should just be
>> wary
>> to not call something exactly once that isn't.
>>
>> 2. Deleting messages:
>> It's rather uncommon for a connector to explicitly delete messages.
>> Usually, they fall off retention at some point. I'd try to decouple that
>> from EOS and give users an option to fully consume or not. Unless of
>> course, it's very natural for SQS consumers to delete the messages; I
>> can't
>> assess that as I have never used SQS.
>>
>> 3. Regarding the parallelism:
>> > *We have reviewed the DynamicParallelismInference and noticed that it is
>> currently limited to batching jobs only [2]. *
>> You are correct it cannot be used and thus we cannot enforce parallelism
>> of
>> 1. I got excited when I saw that interface and didn't read properly.
>>
>> Limiting the parallelism will help to reduce the resources. If you spawn
>> more than 1 source subtask, it should get immediately terminated because
>> there is nothing to do (don't forget to emit SourceReaderFinishedEvent for
>> all idle subtasks). If the pipeline is embarrassing parallel, all
>> downstream subtasks will also be closed.
>> But there is nothing that you can do except document it to set the
>> parallelism of the source to 1. A higher parallelism is only usable with a
>> shuffle then.
>>
>> Alternatively, you could think about allowing the source to subscribe to
>> multiple queues and distribute them to the different subtasks.
>>
>> Best,
>>
>> Arvid
>>
>> On Tue, Sep 10, 2024 at 4:50 PM Saurabh Singh <saurabhsingh9...@gmail.com
>> >
>> wrote:
>>
>> > Hi Danny, Arvid,
>> >
>> > Thank you so much for reviewing the FLIP. We apologize for the delayed
>> > response; we've been quite busy over the past week.
>> >
>> >
>> >    - // .setFailOnError(false)
>> >    A general callout that this is not ideal, I know we use it elsewhere
>> >    but we should consider some pluggable error handling at some point.
>> Not as
>> >    part of this FLIP, but if we can avoid adding this flag here, then I
>> would.
>> >
>> > *Acknowledged. We will remove this flag and address it separately to
>> > enhance our error handling design.*
>> >
>> >    - // The code registers a checkpoint completion notification callback
>> >    via *notifyCheckpointComplete*
>> >    Flink does not guarantee that notifications are actually invoked and
>> >    successful [1]. Therefore there is a chance this method will not
>> run, and
>> >    hence we could violate exactly once semantics here. Suggest that we
>> >    instead/additionally track the SQS read messages within the
>> checkpoint
>> >    state, we can evict from state once deleted, and possibly prune on
>> startup
>> >    or skip duplicates. Thoughts?
>> >
>> > *Yes, our implementation will comply with the checkpoint subsuming
>> > contract mentioned in [1]. To achieve this, we will track newly read
>> > messages <message ids> in a structure associated with a checkpoint ID.
>> In
>> > cases where checkpoint notifications are missed, we will review all
>> > messages in the state with checkpoint IDs less than the received one,
>> > delete those messages, and remove them from the state.*
>> >
>> > *In addition, for optimal functionality, we depend on the visibility
>> > timeout configured for the queue. If read messages are not deleted
>> before
>> > the visibility timeout expires due to any transient issues, they will
>> > reappear. We will offer guidance to users of this connector on adjusting
>> > the visibility timeout value to align with specific checkpoint durations
>> > and avoid duplicate processing.*
>> >
>> >    - Since we only have 1 split, we should also limit the parallelism of
>> >    the source accordingly. We could exploit the
>> DynamicParallelismInference to
>> >    effectively limit it to 1 unless the user explicitly overwrites.
>> >
>> > *We have reviewed the DynamicParallelismInference and noticed that it is
>> > currently limited to batching jobs only [2]. We would appreciate it if
>> you
>> > could help us understand the rationale behind restricting the
>> parallelism
>> > for the source to 1. Are there any benefits to this limitation, or
>> > potential issues if we do not enforce it?*
>> >
>> >    - If I haven't overlooked something obvious, then your exactly-once
>> >    strategy will not work. There is unfortunately no guarantee that
>> >    notifyCheckpointComplete is called at all before a failure happens.
>> So
>> >    you very likely get duplicate messages. Scanning the SQS
>> >    documentation, I saw that you can read the SequenceNumber of the
>> message.
>> >    If you also store the latest number in the checkpoint state of the
>> split,
>> >    then you can discard all messages during recovery that are smaller.
>> But I
>> >    have never used SQS, so just take it as an inspiration. Also note
>> that you
>> >    didn't sketch how you delete messages from the queue. It's very
>> important
>> >    to only delete those messages that are part of the successful
>> checkpoint.
>> >    So you can't use PurgeQueue.
>> >
>> > *As mentioned earlier in the second point, we have already discussed the
>> > mechanism. Please let us know if you have any concerns.*
>> >
>> >    - I wonder if you need to use ReceiveRequestAttemptId. It looks like
>> >    it may be important for retries.
>> >
>> >
>> >
>> > *Since we use the AWS SDK library to establish connections and perform
>> SQS
>> > operations. This library already ensures retries for any transient
>> issues,
>> > with a default of three retries [3]. Additionally, AWS provides
>> predefined
>> > schemes to select from when creating the client.*
>> > Please review and let us know your feedback on this.
>> >
>> >
>> > *[1] *
>> >
>> https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/common/state/CheckpointListener.html
>> > *[2] *
>> >
>> https://github.com/apache/flink/blob/master//flink-core/src/main/java/org/apache/flink/api/connector/source/DynamicParallelismInference.java#L29
>> > *[3]*
>> >
>> https://github.com/aws/aws-sdk-java/blob/61d73631fac8535ad70666bbce9e70a1d2cea2ca/aws-java-sdk-core/src/main/java/com/amazonaws/retry/PredefinedRetryPolicies.java#L41
>> >
>> >
>> >
>> > Regards
>> > Saurabh & Abhi
>> >
>> >
>> >
>> > On Tue, Sep 3, 2024 at 6:40 PM Arvid Heise <ar...@apache.org> wrote:
>> >
>> >> Sorry for being late to the party. I saw your call to vote and looked
>> at
>> >> the FLIP.
>> >>
>> >> First, most of the design is looking really good and it will be good to
>> >> have another connector integrated into the AWS ecosystem. A couple of
>> >> questions/remarks:
>> >> 1) Since we only have 1 split, we should also limit the parallelism of
>> the
>> >> source accordingly. We could exploit the DynamicParallelismInference to
>> >> effectively limit it to 1 unless the user explicitly overwrites.
>> >> 2) If I haven't overlooked something obvious, then your exactly-once
>> >> strategy will not work. There is unfortunately no guarantee
>> >> that notifyCheckpointComplete is called at all before a failure
>> happens.
>> >> So
>> >> you very likely get duplicate messages.
>> >> Scanning the SQS documentation, I saw that you can read the
>> SequenceNumber
>> >> of the message. If you also store the latest number in the checkpoint
>> >> state
>> >> of the split, then you can discard all messages during recovery that
>> are
>> >> smaller. But I have never used SQS, so just take it as an inspiration.
>> >> Also note that you didn't sketch how you delete messages from the
>> queue.
>> >> It's very important to only delete those messages that are part of the
>> >> successful checkpoint. So you can't use PurgeQueue.
>> >> 3) I wonder if you need to use ReceiveRequestAttemptId. It looks like
>> it
>> >> may be important for retries.
>> >>
>> >> Best,
>> >>
>> >> Arvid
>> >>
>> >> On Tue, Aug 20, 2024 at 11:24 AM Ahmed Hamdy <hamdy10...@gmail.com>
>> >> wrote:
>> >>
>> >> > Hi Abhisagar and Saurabh
>> >> > I have created the FLIP page and assigned it FLIP-477[1]. Feel free
>> to
>> >> > resume with the next steps.
>> >> >
>> >> > 1-
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-+477+Amazon+SQS+Source+Connector
>> >> > Best Regards
>> >> > Ahmed Hamdy
>> >> >
>> >> >
>> >> > On Tue, 20 Aug 2024 at 06:05, Abhisagar Khatri <
>> >> > khatri.abhisaga...@gmail.com>
>> >> > wrote:
>> >> >
>> >> > > Hi Flink Devs,
>> >> > >
>> >> > > Gentle Reminder for the request. We'd like to ask the
>> PMC/Committers
>> >> to
>> >> > > transfer the content from the Amazon SQS Source Connector Google
>> Doc
>> >> [1]
>> >> > > and assign a FLIP Number for us, which we can use further for
>> voting.
>> >> > > We are following the procedure outlined on the Flink Improvement
>> >> Proposal
>> >> > > Confluence page [2].
>> >> > >
>> >> > > [1]
>> >> > >
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>> >> > > [2]
>> >> > >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>> >> > >
>> >> > > Regards,
>> >> > > Abhi & Saurabh
>> >> > >
>> >> > >
>> >> > > On Tue, Aug 13, 2024 at 12:50 PM Saurabh Singh <
>> >> > saurabhsingh9...@gmail.com>
>> >> > > wrote:
>> >> > >
>> >> > >> Hi Flink Devs,
>> >> > >>
>> >> > >> Thanks for all the feedback flink devs.
>> >> > >>
>> >> > >> Following the procedure outlined on the Flink Improvement Proposal
>> >> > >> Confluence page [1], we kindly ask the PMC/Committers to transfer
>> the
>> >> > >> content from the Amazon SQS Source Connector Google Doc [2] and
>> >> assign a
>> >> > >> FLIP Number for us, which we will use for voting.
>> >> > >>
>> >> > >> [1]
>> >> > >>
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals#FlinkImprovementProposals-Process
>> >> > >> [2]
>> >> > >>
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>> >> > >>
>> >> > >> Regards
>> >> > >> Saurabh & Abhi
>> >> > >>
>> >> > >>
>> >> > >> On Thu, Aug 8, 2024 at 5:12 PM Saurabh Singh <
>> >> > saurabhsingh9...@gmail.com>
>> >> > >> wrote:
>> >> > >>
>> >> > >>> Hi Ahmed,
>> >> > >>>
>> >> > >>> Yes, you're correct. Currently, we're utilizing the "record
>> >> emitter" to
>> >> > >>> send messages into the queue for deletion. However, for the
>> actual
>> >> > deletion
>> >> > >>> process, which is dependent on the checkpoints, we've been using
>> the
>> >> > source
>> >> > >>> reader class because it allows us to override the
>> >> > notifyCheckpointComplete
>> >> > >>> method.
>> >> > >>>
>> >> > >>> Regards
>> >> > >>> Saurabh & Abhi
>> >> > >>>
>> >> > >>> On Wed, Aug 7, 2024 at 2:18 PM Ahmed Hamdy <hamdy10...@gmail.com
>> >
>> >> > wrote:
>> >> > >>>
>> >> > >>>> Hi Saurabh
>> >> > >>>> Thanks for addressing, I see the FLIP is in much better state.
>> >> > >>>> Could we specify where we queue messages for deletion, In my
>> >> opinion
>> >> > >>>> the record emitter is a good place for that where we delete
>> >> messages
>> >> > that
>> >> > >>>> are forwarded to the next operator.
>> >> > >>>> Other than that I don't have further comments.
>> >> > >>>> Thanks again for the effort.
>> >> > >>>>
>> >> > >>>> Best Regards
>> >> > >>>> Ahmed Hamdy
>> >> > >>>>
>> >> > >>>>
>> >> > >>>> On Wed, 31 Jul 2024 at 10:34, Saurabh Singh <
>> >> > saurabhsingh9...@gmail.com>
>> >> > >>>> wrote:
>> >> > >>>>
>> >> > >>>>> Hi Ahmed,
>> >> > >>>>>
>> >> > >>>>> Thank you very much for the detailed, valuable review. Please
>> find
>> >> > our
>> >> > >>>>> responses below:
>> >> > >>>>>
>> >> > >>>>>
>> >> > >>>>>    - In the FLIP you mention the split is going to be 1 sqs
>> Queue,
>> >> > >>>>>    does this mean we would support reading from multiple
>> queues?
>> >> > This is also
>> >> > >>>>>    not clear in the implementation of `addSplitsBack` whether
>> we
>> >> are
>> >> > planning
>> >> > >>>>>    to support multiple sqs topics or not.
>> >> > >>>>>
>> >> > >>>>> *Our current implementation assumes that each source reads
>> from a
>> >> > >>>>> single SQS queue. If you need to read from multiple SQS queues,
>> >> you
>> >> > can
>> >> > >>>>> define multiple sources accordingly. We believe this approach
>> is
>> >> > clearer
>> >> > >>>>> and more organized compared to having a single source switch
>> >> between
>> >> > >>>>> multiple queues. This design choice is based on weighing the
>> >> > benefits, but
>> >> > >>>>> we can support multiple queues per source if the need arises.*
>> >> > >>>>>
>> >> > >>>>>    - Regarding Client creation, there has been some effort in
>> the
>> >> > >>>>>    common `aws-util` module like createAwsSyncClient, we should
>> >> > reuse that for
>> >> > >>>>>    `SqsClient` creation.
>> >> > >>>>>
>> >> > >>>>>    *Thank you for bringing this to our attention. Yes, we will
>> >> > >>>>>    utilize the existing createClient methods available in the
>> >> > libraries. Our
>> >> > >>>>>    goal is to avoid any code duplication on our end.*
>> >> > >>>>>
>> >> > >>>>>
>> >> > >>>>>    - On the same point for clients, Is there a reason the FLIP
>> >> > >>>>>    suggests async clients? sync clients have proven more stable
>> >> and
>> >> > the source
>> >> > >>>>>    threading model already guarantees no blocking by sync
>> clients.
>> >> > >>>>>
>> >> > >>>>> *We were not aware of this, and we have been using async
>> clients
>> >> for
>> >> > >>>>> our in-house use cases. However, since we already have sync
>> >> clients
>> >> > in the
>> >> > >>>>> aws-util that ensure no blocking, we are in a good position. We
>> >> will
>> >> > use
>> >> > >>>>> these sync clients during our development and testing efforts,
>> and
>> >> > we will
>> >> > >>>>> share the results and keep the community updated.*
>> >> > >>>>>
>> >> > >>>>>    - On mentioning threading, the FLIP doesn’t mention the
>> fetcher
>> >> > >>>>>    manager. Is it going to be `SingleThreadFetcherManager`?
>> Would
>> >> it
>> >> > be better
>> >> > >>>>>    to make the source reader extend the
>> >> > SingleThreadedMultiplexReaderBase or
>> >> > >>>>>    are we going to implement a more simple version?
>> >> > >>>>>
>> >> > >>>>> *Yes, we are considering implementing
>> >> > >>>>> SingleThreadMultiplexSourceReaderBase for the Reader. We have
>> >> > included the
>> >> > >>>>> implementation snippet in the FLIP for reference.*
>> >> > >>>>>
>> >> > >>>>>    - The FLIP doesn’t mention schema deserialization or the
>> >> > >>>>>    recordEmitter implementation, Are we going to use
>> >> > `deserializationSchema`
>> >> > >>>>>    or some sort of string to element converter? It is also not
>> >> clear
>> >> > form the
>> >> > >>>>>    builder example provided?
>> >> > >>>>>
>> >> > >>>>> *Yes, we plan to use the deserializationSchema and
>> recordEmitter
>> >> > >>>>> implementations. We have included sample code for these in the
>> >> FLIP
>> >> > for
>> >> > >>>>> reference.*
>> >> > >>>>>
>> >> > >>>>>    - Are the values mentioned in getSqsClientProperties
>> >> recommended
>> >> > >>>>>    defaults? If so we should highlight that.
>> >> > >>>>>
>> >> > >>>>> *The defaults are not decided. These are just sample snapshots
>> for
>> >> > >>>>> example.*
>> >> > >>>>>
>> >> > >>>>>    - Most importantly I am a bit skeptical regarding enforcing
>> >> > >>>>>    exactly-once semantics with side effects especially with
>> >> > dependency on
>> >> > >>>>>    checkpointing configuration, could we add flags to disable
>> and
>> >> > disable by
>> >> > >>>>>    default if the checkpointing is not enabled?
>> >> > >>>>>
>> >> > >>>>> *During our initial design phase, we intended to enforce
>> >> exactly-once
>> >> > >>>>> semantics via checkpoints. However, you raise a valid point,
>> and
>> >> we
>> >> > will
>> >> > >>>>> make this a configurable feature for users. They can choose to
>> >> > disable
>> >> > >>>>> exactly-once semantics, accepting some duplicate processing
>> >> > (at-least-once)
>> >> > >>>>> as a trade-off. We have updated the FLIP to include support for
>> >> this
>> >> > >>>>> feature.*
>> >> > >>>>>
>> >> > >>>>>    - I am not 100% convinced we should block FLIP itself on the
>> >> > >>>>>    FLIP-438 implementation but I echo the fact that there
>> might be
>> >> > some
>> >> > >>>>>    reusable code between the 2 submodules we should make use
>> of.
>> >> > >>>>>
>> >> > >>>>>    *Yes we echo the same. We fully understand the concern and
>> are
>> >> > >>>>>    closely examining the SQS Sink implementation. We will
>> ensure
>> >> > there is no
>> >> > >>>>>    duplication of work or submodules. If any issues arise, we
>> will
>> >> > address
>> >> > >>>>>    them promptly.*
>> >> > >>>>>
>> >> > >>>>>
>> >> > >>>>> Thank you for your valuable feedback on the FLIP. Your input is
>> >> > >>>>> helping us refine and improve it significantly.
>> >> > >>>>>
>> >> > >>>>> [Main Proposal doc] -
>> >> > >>>>>
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>> >> > >>>>>
>> >> > >>>>> Regards
>> >> > >>>>> Saurabh & Abhi
>> >> > >>>>>
>> >> > >>>>>
>> >> > >>>>> On Sun, Jul 28, 2024 at 3:04 AM Ahmed Hamdy <
>> hamdy10...@gmail.com
>> >> >
>> >> > >>>>> wrote:
>> >> > >>>>>
>> >> > >>>>>> Hi Saurabh
>> >> > >>>>>> I think this is going to be a valuable addition which is
>> needed.
>> >> > >>>>>> I have a couple of comments
>> >> > >>>>>> - In the FLIP you mention the split is going to be 1 sqs
>> Queue,
>> >> does
>> >> > >>>>>> this
>> >> > >>>>>> mean we would support reading from multiple queues? The
>> builder
>> >> > >>>>>> example
>> >> > >>>>>> seems to support a single queue
>> >> > >>>>>> SqsSource.<T>builder.setSqsUrl("
>> >> > >>>>>> https://sqs.us-east-1.amazonaws.com/23145433/sqs-test";)
>> >> > >>>>>> - This is also not clear in the implementation of
>> `addSplitsBack`
>> >> > >>>>>> whether
>> >> > >>>>>> we are planning to support multiple sqs topics or not.
>> >> > >>>>>> - Regarding Client creation, there has been some effort in the
>> >> > common
>> >> > >>>>>> `aws-util` module like createAwsSyncClient  , we should reuse
>> >> that
>> >> > for
>> >> > >>>>>> `SqsClient` creation.
>> >> > >>>>>> - On the same point for clients, Is there a reason the FLIP
>> >> suggests
>> >> > >>>>>> async
>> >> > >>>>>> clients? sync clients have proven more stable and the source
>> >> > threading
>> >> > >>>>>> model already guarantees no blocking by sync clients.
>> >> > >>>>>> - On mentioning threading, the FLIP doesn't mention the
>> fetcher
>> >> > >>>>>> manager. Is
>> >> > >>>>>> it going to be `SingleThreadFetcherManager`? Would it be
>> better
>> >> to
>> >> > >>>>>> make the
>> >> > >>>>>> source reader extend the SingleThreadedMultiplexReaderBase or
>> >> are we
>> >> > >>>>>> going
>> >> > >>>>>> to implement a more simple version?
>> >> > >>>>>> - The FLIP doesn't mention schema deserialization or the
>> >> > recordEmitter
>> >> > >>>>>> implementation, Are we going to use `deserializationSchema` or
>> >> some
>> >> > >>>>>> sort of
>> >> > >>>>>> string to element converter? It is also not clear form the
>> >> builder
>> >> > >>>>>> example
>> >> > >>>>>> provided?
>> >> > >>>>>> - Are the values mentioned in getSqsClientProperties
>> recommended
>> >> > >>>>>> defaults?
>> >> > >>>>>> If so we should highlight that
>> >> > >>>>>> - Most importantly I am a bit skeptical regarding enforcing
>> >> > >>>>>> exactly-once
>> >> > >>>>>> semantics with side effects especially with dependency on
>> >> > >>>>>> checkpointing
>> >> > >>>>>> configuration, could we add flags to disable and disable by
>> >> default
>> >> > >>>>>> if the
>> >> > >>>>>> checkpointing is not enabled?
>> >> > >>>>>>
>> >> > >>>>>> I am not 100% convinced we should block FLIP itself on the
>> >> FLIP-438
>> >> > >>>>>> implementation but I echo the fact that there might be some
>> >> reusable
>> >> > >>>>>> code
>> >> > >>>>>> between the 2 submodules we should make use of.
>> >> > >>>>>>
>> >> > >>>>>>  Best Regards
>> >> > >>>>>> Ahmed Hamdy
>> >> > >>>>>>
>> >> > >>>>>>
>> >> > >>>>>> On Fri, 26 Jul 2024 at 17:55, Saurabh Singh <
>> >> > >>>>>> saurabhsingh9...@gmail.com>
>> >> > >>>>>> wrote:
>> >> > >>>>>>
>> >> > >>>>>> > Hi Li Wang,
>> >> > >>>>>> >
>> >> > >>>>>> > Thanks for the review and appreciate your feedback.
>> >> > >>>>>> > I completely understand your concern and agree with it. Our
>> >> goal
>> >> > is
>> >> > >>>>>> to
>> >> > >>>>>> > provide users of connectors with a consistent and coherent
>> >> > >>>>>> ecosystem, free
>> >> > >>>>>> > of any issues.
>> >> > >>>>>> > To ensure that, we are closely monitoring/reviewing the SQS
>> >> Sink
>> >> > >>>>>> > implementation work by Dhingra. Our development work will
>> >> commence
>> >> > >>>>>> once the
>> >> > >>>>>> > AWS Sink is near completion or completed. This approach
>> ensures
>> >> > >>>>>> that we
>> >> > >>>>>> > take in the new learnings, do not duplicate any core modules
>> >> and
>> >> > >>>>>> allow us
>> >> > >>>>>> > to save valuable time.
>> >> > >>>>>> >
>> >> > >>>>>> > In the meantime, we would like to keep the discussion and
>> >> process
>> >> > >>>>>> active on
>> >> > >>>>>> > this FLIP. Gaining valuable community feedback (which is
>> >> helping
>> >> > >>>>>> us) will
>> >> > >>>>>> > help us address any potential gaps in the source connector
>> >> design
>> >> > >>>>>> and
>> >> > >>>>>> > finalize it. Behind the scenes, we are already designing and
>> >> > >>>>>> pre-planning
>> >> > >>>>>> > our development work to adhere to feedback/best practices/
>> >> faster
>> >> > >>>>>> delivery
>> >> > >>>>>> > when we implement this FLIP.
>> >> > >>>>>> > Please share your thoughts on this.
>> >> > >>>>>> >
>> >> > >>>>>> > Regards
>> >> > >>>>>> > Saurabh
>> >> > >>>>>> >
>> >> > >>>>>> > On Fri, Jul 26, 2024 at 5:52 PM Li Wang <
>> >> liwang505...@gmail.com>
>> >> > >>>>>> wrote:
>> >> > >>>>>> >
>> >> > >>>>>> > > Hi Saurabh,
>> >> > >>>>>> > >
>> >> > >>>>>> > > Thanks for the FLIP. Given the ongoing effort to implement
>> >> the
>> >> > >>>>>> SQS sink
>> >> > >>>>>> > > connector (
>> >> > >>>>>> > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-438%3A+Amazon+SQS+Sink+Connector
>> >> > >>>>>> > > ),
>> >> > >>>>>> > > it is important to consider how the SQS source connector
>> >> > supports
>> >> > >>>>>> this
>> >> > >>>>>> > > development, ensuring a unified framework for AWS
>> integration
>> >> > >>>>>> that avoids
>> >> > >>>>>> > > capacity overlap and maximizes synchronization. Since the
>> >> > >>>>>> implementation
>> >> > >>>>>> > by
>> >> > >>>>>> > > Dhingra is still WIP, I would suggest taking that into
>> >> account
>> >> > >>>>>> and keep
>> >> > >>>>>> > > this FLIP as an extension of that FLIP which could be
>> taken
>> >> up
>> >> > >>>>>> once the
>> >> > >>>>>> > SQS
>> >> > >>>>>> > > Sink FLIP is fully implemented. If not, this may lead to
>> >> > >>>>>> doublework in
>> >> > >>>>>> > > multiple identity-related modules and structure of the
>> >> overall
>> >> > SQS
>> >> > >>>>>> > > connector codebase. What do you think?
>> >> > >>>>>> > >
>> >> > >>>>>> > > Thanks
>> >> > >>>>>> > >
>> >> > >>>>>> > >
>> >> > >>>>>> > > On Thursday, July 25, 2024, Saurabh Singh <
>> >> > >>>>>> saurabhsingh9...@gmail.com>
>> >> > >>>>>> > > wrote:
>> >> > >>>>>> > >
>> >> > >>>>>> > > > Hi Samrat,
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > Thanks for the review and feedback.
>> >> > >>>>>> > > > We have evaluated all the three points. Please find the
>> >> > answers
>> >> > >>>>>> below:
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > 1. AWS has announced JSON protocol support in SQS [1].
>> Can
>> >> you
>> >> > >>>>>> shed
>> >> > >>>>>> > some
>> >> > >>>>>> > > > light on how different protocols will be supported?
>> >> > >>>>>> > > >  - We will utilize the AWS client library to connect
>> with
>> >> the
>> >> > >>>>>> AWS SQS
>> >> > >>>>>> > > > Service. Versions beyond 2.21.19 now support JSON, so
>> >> simply
>> >> > >>>>>> upgrading
>> >> > >>>>>> > > the
>> >> > >>>>>> > > > client library will suffice for the protocol switch.
>> >> However,
>> >> > >>>>>> from the
>> >> > >>>>>> > > > connector's perspective, we do not anticipate any
>> changes
>> >> in
>> >> > our
>> >> > >>>>>> > > > communication process, as it is handled by the client
>> >> library.
>> >> > >>>>>> [4]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > 2. AWS SQS has two types of queues [2]. What are the
>> >> > >>>>>> implementation
>> >> > >>>>>> > > detail
>> >> > >>>>>> > > > differences for the source connector?
>> >> > >>>>>> > > > - SQS Connector is indifferent to the customer's choice
>> of
>> >> > >>>>>> Queue type.
>> >> > >>>>>> > If
>> >> > >>>>>> > > > out-of-order messages are a concern, it will be the
>> >> > >>>>>> responsibility of
>> >> > >>>>>> > the
>> >> > >>>>>> > > > application code or main job logic to manage this.
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > 3. Will the SQS source connector implement any kind of
>> >> > >>>>>> callbacks [3] on
>> >> > >>>>>> > > > success to offer any kind of guarantee?
>> >> > >>>>>> > > > - We have proposed deleting SQS messages using the
>> >> > notification
>> >> > >>>>>> > provided
>> >> > >>>>>> > > by
>> >> > >>>>>> > > > the checkpoint framework on checkpoint completion. Thus
>> >> > >>>>>> providing
>> >> > >>>>>> > exactly
>> >> > >>>>>> > > > once guarantee.[5] Additionally, when deleting
>> messages, we
>> >> > will
>> >> > >>>>>> > monitor
>> >> > >>>>>> > > > the API call responses and log any failures, along with
>> >> > >>>>>> providing
>> >> > >>>>>> > > > observability through appropriate metrics.
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > [1]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>> >> > >>>>>> > > > [2]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>> >> > >>>>>> > > > [3]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>> >> > >>>>>> > > > [4]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-json-faqs.html#json-protocol-getting-started
>> >> > >>>>>> > > > [5]
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.jdcikzojx5d9
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > [Main Proposal doc] -
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit#heading=h.ci1rrcgbsvkl
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > Please feel free to reach out if you have more feedback.
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > Regards
>> >> > >>>>>> > > > Saurabh & Abhi
>> >> > >>>>>> > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > On Wed, Jul 24, 2024 at 8:52 AM Samrat Deb <
>> >> > >>>>>> decordea...@gmail.com>
>> >> > >>>>>> > > wrote:
>> >> > >>>>>> > > >
>> >> > >>>>>> > > > > Hi Saurabh,
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > Thank you for sharing the FLIP for the SQS source
>> >> connector.
>> >> > >>>>>> An SQS
>> >> > >>>>>> > > > source
>> >> > >>>>>> > > > > connector will be a great addition to the Flink
>> >> ecosystem,
>> >> > as
>> >> > >>>>>> there
>> >> > >>>>>> > is
>> >> > >>>>>> > > a
>> >> > >>>>>> > > > > growing demand for SQS source/sink integration.
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > I have a few queries:
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > 1. AWS has announced JSON protocol support in SQS [1].
>> >> Can
>> >> > >>>>>> you shed
>> >> > >>>>>> > > some
>> >> > >>>>>> > > > > light on how different protocols will be supported?
>> >> > >>>>>> > > > > 2. AWS SQS has two types of queues [2]. What are the
>> >> > >>>>>> implementation
>> >> > >>>>>> > > > detail
>> >> > >>>>>> > > > > differences for the source connector?
>> >> > >>>>>> > > > > 3. Will the SQS source connector implement any kind of
>> >> > >>>>>> callbacks [3]
>> >> > >>>>>> > on
>> >> > >>>>>> > > > > success to offer any kind of guarantee?
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > [1]
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://aws.amazon.com/about-aws/whats-new/2023/11/amazon-sqs-support-json-protocol/
>> >> > >>>>>> > > > > [2]
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-queue-types.html
>> >> > >>>>>> > > > > [3]
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.aws.amazon.com/step-functions/latest/dg/callback-task-sample-sqs.html
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > Bests,
>> >> > >>>>>> > > > > Samrat
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > On Fri, 19 Jul 2024 at 9:53 PM, Saurabh Singh <
>> >> > >>>>>> > > > saurabhsingh9...@gmail.com>
>> >> > >>>>>> > > > > wrote:
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > > > > Hi Fink Devs,
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > > Our team has been working on migrating various data
>> >> > >>>>>> pipelines to
>> >> > >>>>>> > > Flink
>> >> > >>>>>> > > > to
>> >> > >>>>>> > > > > > leverage the benefits of exactly-once processing,
>> >> > >>>>>> checkpointing,
>> >> > >>>>>> > and
>> >> > >>>>>> > > > > > stateful computing. We have several use cases built
>> >> around
>> >> > >>>>>> the AWS
>> >> > >>>>>> > > SQS
>> >> > >>>>>> > > > > > Service. For this migration, we have developed an
>> SQS
>> >> > Source
>> >> > >>>>>> > > Connector,
>> >> > >>>>>> > > > > > which enables us to run both stateless and stateful
>> >> > >>>>>> Flink-based
>> >> > >>>>>> > jobs.
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > > We believe that this SQS Source Connector would be a
>> >> > >>>>>> valuable
>> >> > >>>>>> > > addition
>> >> > >>>>>> > > > to
>> >> > >>>>>> > > > > > the existing connector set. Therefore, we propose a
>> >> FLIP
>> >> > to
>> >> > >>>>>> include
>> >> > >>>>>> > > it.
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > > For more information, please refer to the FLIP
>> >> document.
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> >
>> >>
>> https://docs.google.com/document/d/1lreo27jNh0LkRs1Mj9B3wj3itrzMa38D4_XGryOIFks/edit?usp=sharing
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > > > Thanks
>> >> > >>>>>> > > > > > Saurabh & Abhi
>> >> > >>>>>> > > > > >
>> >> > >>>>>> > > > >
>> >> > >>>>>> > > >
>> >> > >>>>>> > >
>> >> > >>>>>> >
>> >> > >>>>>>
>> >> > >>>>>
>> >> >
>> >>
>> >
>>
>

Reply via email to