[ 
https://issues.apache.org/jira/browse/BEAM-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442350#comment-17442350
 ] 

Kenneth Knowles commented on BEAM-13203:
----------------------------------------

I am setting this back to P1 which is for failing tests, potential data loss, 
loss of functionality. P0 is for real-time outages like the website or GitHub 
being down, etc.

> Potential data loss when using SnsIO.writeAsync
> -----------------------------------------------
>
>                 Key: BEAM-13203
>                 URL: https://issues.apache.org/jira/browse/BEAM-13203
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Priority: P1
>
> This needs to be investigated, reading the code suggests we might be losing 
> data under certain conditions e.g. when terminating the pipeline. The async 
> processing model here is far too simplistic.
> The bundle won't ever know about pending writes and won't block to wait for 
> any such operation. The same way exceptions are thrown into nowhere. Test 
> cases don't capture this as they operate on completed futures only (so 
> exceptions in the callbacks get thrown on the thread of processElement).
> {code:java}
> client.publish(publishRequest).whenComplete((response, ex) -> {
>   if (ex == null) {
>     SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
>     context.output(snsResponse);
>   } else {
>     LOG.error("Error while publishing request to SNS", ex);
>     throw new SnsWriteException("Error while publishing request to SNS", ex);
>   }
> }); {code}
> Also, this entirely removes backpressure from a stream. When used with a much 
> faster source we will continue to accumulate more and more memory as the 
> number of concurrent pending async operations is not limited.
> Spotify's scio contains a 
> [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java]
>  that illustrates how it can be done.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to