mdedetrich opened a new issue, #1054:
URL: https://github.com/apache/pekko-connectors/issues/1054

   I am experiencing an issue in production where when uploading a file to GCS 
using `GCStorage.resumableUpload` I sometimes get this error
   
   ```
   java.util.concurrent.CompletionException: java.util.NoSuchElementException: 
last of empty stream
        at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
Source)
        at 
java.base/java.util.concurrent.CompletableFuture$BiRelay.tryFire(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture$CoCompletion.tryFire(Unknown 
Source)
        at 
java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at 
java.base/java.util.concurrent.CompletableFuture.completeExceptionally(Unknown 
Source)
        at 
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:26)
        at 
scala.concurrent.impl.FutureConvertersImpl$CF.apply(FutureConvertersImpl.scala:23)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
        at 
scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
        at 
scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
        at 
scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
        at 
scala.concurrent.impl.Promise$Transformation.handleFailure(Promise.scala:444)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:506)
        at 
scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:222)
        at 
scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:429)
        at 
scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:338)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:285)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:278)
        at scala.concurrent.Promise.trySuccess(Promise.scala:99)
        at scala.concurrent.Promise.trySuccess$(Promise.scala:99)
        at 
scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:104)
        at 
org.apache.pekko.stream.impl.TakeLastStage$$anon$1.onUpstreamFinish(Sinks.scala:188)
        at 
org.apache.pekko.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:537)
        at 
org.apache.pekko.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:401)
        at 
org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:662)
        at 
org.apache.pekko.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:532)
        at 
org.apache.pekko.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:637)
        at 
org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.org$apache$pekko$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:813)
        at 
org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.org$apache$pekko$stream$impl$fusing$ActorGraphInterpreter$$shortCircuitBatch(ActorGraphInterpreter.scala:800)
        at 
org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:791)
        at org.apache.pekko.actor.Actor.aroundPreStart(Actor.scala:558)
        at org.apache.pekko.actor.Actor.aroundPreStart$(Actor.scala:558)
        at 
org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:729)
        at org.apache.pekko.actor.ActorCell.create(ActorCell.scala:653)
        at org.apache.pekko.actor.ActorCell.invokeAll$1(ActorCell.scala:523)
        at org.apache.pekko.actor.ActorCell.systemInvoke(ActorCell.scala:545)
        at 
org.apache.pekko.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:297)
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:232)
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245)
        at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
        at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown 
Source)
        at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown 
Source)
   Caused by: java.util.NoSuchElementException: last of empty stream
        at 
org.apache.pekko.stream.scaladsl.Sink$.$anonfun$last$3(Sink.scala:236)
        at scala.Option.getOrElse(Option.scala:201)
        at 
org.apache.pekko.stream.scaladsl.Sink$.$anonfun$last$2(Sink.scala:236)
        at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:467)
        ... 31 common frames omitted
   ```
   
   After diagnosis I found out that this occurs when you happen to be uploading 
an empty `ByteString`, I wrote a simple test to verify this and I can reproduce 
this error
   
   ```scala
   "get a single empty ByteString when downloading a non existing file using 
resumableUpload" in {
     val fileName = testFileName("non-existing-file")
     val res = for {
       _ <- Source.empty[ByteString]
         .withAttributes(finalAttributes)
         .runWith(GCStorageStream.resumableUpload(bucket, fileName, 
ContentTypes.`text/plain(UTF-8)`))
       res <- GCStorageStream
         .download(bucket, fileName)
         .withAttributes(finalAttributes)
         .runWith(Sink.head)
         .flatMap(
           _.map(_.runWith(Sink.fold(ByteString.empty) { _ ++ _ 
})).getOrElse(Future.successful(ByteString.empty)))
     } yield res
     res.futureValue shouldBe ByteString.empty
   }
   ```
   
   Note that there is no reason why uploading an empty `ByteString` shouldn't 
work, especially considering the it works fine using 
[GCStorage.putObject](https://github.com/apache/pekko-connectors/blob/9f2ac5a9b54a655dd3350ae9830bc85c95c91d16/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala#L226-L241)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org
For additional commands, e-mail: notifications-h...@pekko.apache.org

Reply via email to