pjfanning commented on code in PR #1802: URL: https://github.com/apache/pekko/pull/1802#discussion_r2003164841
########## stream/src/main/scala/org/apache/pekko/task/Task.scala: ########## @@ -0,0 +1,176 @@ +package org.apache.pekko.task + +import org.apache.pekko.stream.Materializer +import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.Promise +import scala.concurrent.Future +import scala.util.{Try,Success,Failure} +import java.util.concurrent.CompletableFuture +import org.apache.pekko.stream.KillSwitch +import org.apache.pekko.stream.Graph +import org.apache.pekko.stream.ClosedShape + +/* + * Cancellation is not exposed yet. In order to add it, we need the following: + * - Feedback from KillSwitch on when a graph has actually fully stopped + * - Access to ExecutorService for our dispatchers (not just Executor), so we don't need to rely on Future. + */ +abstract class Task[+T] { + def definition: TaskDef[T] +} + +sealed trait TaskDef[+T] +object TaskDef { + def narrow[T](t: TaskDef[? <: T]): TaskDef[T] = t +} + +case class GraphDef[T](graph: Graph[ClosedShape, (KillSwitch,Future[T])]) extends TaskDef[FiberImpl[T]] { + type Res = T +} + +case class ValueDef[+T](value: () => Try[T]) extends TaskDef[T] + +case class MapDef[T,U](base: TaskDef[T], fn: Try[T] => Try[U]) extends TaskDef[U] { + type Base = T +} + +case class FlatMapDef[T,U](base: TaskDef[T], fn: Try[T] => TaskDef[U]) extends TaskDef[U] { + type Base = T +} + +case class ForkDef[T](task: TaskDef[T]) extends TaskDef[FiberImpl[T]] { + type Res = T +} + +case class JoinDef[T](fiber: FiberImpl[T]) extends TaskDef[T] + +case class CancelDef[T](fiber: FiberImpl[T]) extends TaskDef[Unit] + +class FiberImpl[T] { + sealed trait FiberState + case class Running(onComplete: Seq[Try[T] => Unit] = Seq.empty) extends FiberState + case class Completed(result: Try[T]) extends FiberState + case object Cancelled extends FiberState + + protected def onCancel(): Unit = {} + val state = new AtomicReference[FiberState](Running()) + + def cancel() = { + // We currently invoke onCancel on every cancellation attempt. We might need to revisit that. + onCancel() + completeTo(Cancelled, FiberImpl.cancellationFailure) + } + def complete(res: Try[T]) = completeTo(Completed(res), res) + + private def completeTo(newState: FiberState, res: Try[T]): Unit = { + // Complete the state, but only if it's still running + val s = state.getAndUpdate(_ match { + case Running(_) => newState + case other => other + }) + + s match { + case Running(onComplete: Seq[Try[T] => Unit]) => + for (callback <- onComplete) { + callback(res) + } + case _ => + } + } + + def onComplete(f: Try[T] => Unit) = { + // Either it's already completed (don't add a listener), or it is completed (add a listener) + val s = state.getAndUpdate(_ match { + case Running(callbacks) => Running(callbacks :+ f) + case other => other + }) + + // Directly apply the callback if we're already completed + s match { + case Running(_) => + // No action now, we've added the callback + case Completed(res) => + f.apply(res) + case Cancelled => + f.apply(FiberImpl.cancellationFailure) + } + } + + def isCancelled = state.get == Cancelled +} + +object FiberImpl { + val cancellationFailure = Failure(new InterruptedException("Fiber cancelled.")) +} + +class Runtime(mat: Materializer) { + val executor = mat.executionContext + + // TODO move to ScalaDSL + def runToPromise[T](task: Task[T]): Promise[T] = { + val p = Promise[T]() + run(new FiberImpl[T](), task.definition, (res:Try[T]) => p.complete(res)) + p + } + + // TODO move to JavaDSL + def runAsync[T](task: Task[T]): CompletableFuture[T] = { + val f = new CompletableFuture[T]() + run(new FiberImpl[T](), task.definition, (res:Try[T]) => res match { + case Success(t) => f.complete(t) + case Failure(x) => f.completeExceptionally(x) + }) + f + } + + private def run[T](fiber: FiberImpl[_], task: TaskDef[T], onComplete: Try[T] => Unit): Unit = { + if (fiber.isCancelled) onComplete(FiberImpl.cancellationFailure) else task match { + case g@GraphDef(graph) => + val (killswitch, future) = mat.materialize(graph) + // This returns a Fiber, which when cancelled should cancel the graph as well. + val childFiber = new FiberImpl[g.Res] { + override def onCancel() = { + killswitch.shutdown() + } + } + future.onComplete { res => + childFiber.complete(res) + }(scala.concurrent.ExecutionContext.global) Review Comment: * we need to avoid hardcoding the global context * even if comes to setting up a config in the reference.conf that defines an executioncontext created specifically for this * might be nicer if it was controlled by an implicit param though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: notifications-unsubscr...@pekko.apache.org For additional commands, e-mail: notifications-h...@pekko.apache.org