Akka Streams - エラー時のリスタート

Actor の場合の backoff supervision pattern for actors (Supervision and Monitoring • Akka Documentation) のように Akka Streams には、RestartSource, RestartSink, RestartFlow があります。

このサンプルでは、RestartSource を使って、リトライを 2回 行います(結果的にはリトライも失敗してフローそのものがエラーにより終了します)。

package jp.pigumer.akka

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream._
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object RetrySpec extends App {

  implicit val system: ActorSystem = ActorSystem("RetrySpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val source: Source[Int, NotUsed] = RestartSource.withBackoff(
    minBackoff = 1 seconds,
    maxBackoff = 1 seconds,
    randomFactor = 0.2,
    maxRestarts = 2
  ) { () ⇒
    Source.single(3)
      .log("source")
      .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
      .map {
        case i @ 3 ⇒
          logger.info(s"retry: $i")
          throw new RuntimeException()
        case i ⇒
          i
      }
  }

  val done =
    source
      .toMat(Sink.ignore)(Keep.right)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }
}