Akka Streams - KillSwitch で外から FlowShape を制御する

KillSwitch をフローで使用して、viaMat(killSwitch)(Keep.right) で、後で使用できるようにします。 サンプルコードでは、フローの完了を取得するため、toMat(sink)(Keep.both) を使って、KillSwitch と Future[Done] の両方を受取り、 2 秒後に KillSwitch.shutdown() を実行して RunnableGraph を正常終了させます。

エラーで終了させたい場合は、KillSwitch.abort(Throwable) を使うことができます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, DelayOverflowStrategy, KillSwitches}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object KillSwitchSpec extends App {

  implicit val system: ActorSystem = ActorSystem("KillSwitchSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val source = Source(Stream.from(1))
    .delay(1 seconds, DelayOverflowStrategy.backpressure)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val killSwitch = KillSwitches.single[Int]
  val flow = Flow[Int].map { i ⇒
    i
  }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val (k, done) =
    source
      .viaMat(killSwitch)(Keep.right)
      .via(flow).toMat(Sink.ignore)(Keep.both)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }

  Thread.sleep(2000)
  k.shutdown()
}