[ https://issues.apache.org/jira/browse/BEAM-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17442350#comment-17442350 ]
Kenneth Knowles commented on BEAM-13203: ---------------------------------------- I am setting this back to P1 which is for failing tests, potential data loss, loss of functionality. P0 is for real-time outages like the website or GitHub being down, etc. > Potential data loss when using SnsIO.writeAsync > ----------------------------------------------- > > Key: BEAM-13203 > URL: https://issues.apache.org/jira/browse/BEAM-13203 > Project: Beam > Issue Type: Bug > Components: io-java-aws > Reporter: Moritz Mack > Priority: P1 > > This needs to be investigated, reading the code suggests we might be losing > data under certain conditions e.g. when terminating the pipeline. The async > processing model here is far too simplistic. > The bundle won't ever know about pending writes and won't block to wait for > any such operation. The same way exceptions are thrown into nowhere. Test > cases don't capture this as they operate on completed futures only (so > exceptions in the callbacks get thrown on the thread of processElement). > {code:java} > client.publish(publishRequest).whenComplete((response, ex) -> { > if (ex == null) { > SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response); > context.output(snsResponse); > } else { > LOG.error("Error while publishing request to SNS", ex); > throw new SnsWriteException("Error while publishing request to SNS", ex); > } > }); {code} > Also, this entirely removes backpressure from a stream. When used with a much > faster source we will continue to accumulate more and more memory as the > number of concurrent pending async operations is not limited. > Spotify's scio contains a > [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java] > that illustrates how it can be done. -- This message was sent by Atlassian Jira (v8.20.1#820001)