mdedetrich opened a new issue, #1054: URL: https://github.com/apache/pekko-connectors/issues/1054
I am experiencing an issue in production where when uploading a file to GCS using `GCStorage.resumableUpload` I sometimes get this error ``` java.util.concurrent.CompletionException: java.util.NoSuchElementException: last of empty stream at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown Source) at scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:26) at scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:23) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484) at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222) at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429) at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285) at scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:506) at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222) at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429) at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278) at scala.concurrent.Promise.trySuccess(Promise.scala:99) at scala.concurrent.Promise.trySuccess$(Promise.scala:99) at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:104) at org.apache.pekko.stream.impl.TakeLastStage$$anon$1.onUpstreamFinish(Sinks.scala:188) at org.apache.pekko.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:537) at org.apache.pekko.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:401) at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:662) at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:532) at org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:637) at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.org$apache$pekko$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:813) at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.org$apache$pekko$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:800) at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:791) at org.apache.pekko.actor.Actor.aroundPreStart(Actor.scala:558) at org.apache.pekko.actor.Actor.aroundPreStart$(Actor.scala:558) at org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:729) at org.apache.pekko.actor.ActorCell.create(ActorCell.scala:653) at org.apache.pekko.actor.ActorCell.invokeAll$1(ActorCell.scala:523) at org.apache.pekko.actor.ActorCell.systemInvoke(ActorCell.scala:545) at org.apache.pekko.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:297) at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:232) at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245) at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source) at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source) at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source) Caused by: java.util.NoSuchElementException: last of empty stream at org.apache.pekko.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:236) at scala.Option.getOrElse(Option.scala:201) at org.apache.pekko.stream.scaladsl.Sink$.$anonfun$last$2(Sink.scala:236) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467) ... 31 common frames omitted ``` After diagnosis I found out that this occurs when you happen to be uploading an empty `ByteString`, I wrote a simple test to verify this and I can reproduce this error ```scala "get a single empty ByteString when downloading a non existing file using resumableUpload" in { val fileName = testFileName("non-existing-file") val res = for { _ <- Source.empty[ByteString] .withAttributes(finalAttributes) .runWith(GCStorageStream.resumableUpload(bucket, fileName, ContentTypes.`text/plain(UTF-8)`)) res <- GCStorageStream .download(bucket, fileName) .withAttributes(finalAttributes) .runWith(Sink.head) .flatMap( _.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _ })).getOrElse(Future.successful(ByteString.empty))) } yield res res.futureValue shouldBe ByteString.empty } ``` Note that there is no reason why uploading an empty `ByteString` shouldn't work, especially considering the it works fine using [GCStorage.putObject](https://github.com/apache/pekko-connectors/blob/9f2ac5a9b54a655dd3350ae9830bc85c95c91d16/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala#L226-L241) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org