Akka Streams - Paginator

一覧表示でよくある Paginator を実装してみます。

このサンプルコードでは、Paginator の対象となるデータは、Stream.from(1) で生成するため、1 からはじまる無限個のデータとなります。 そのため、すべてのページのデータを作成してから、必要なページのデータを抽出するということはできません。 該当ページのデータまでスキップし、該当ページのデータが取れたところで処理を止める必要があります。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.util.Success

object PaginatorSpec extends App {

  implicit val system: ActorSystem = ActorSystem("PaginatorSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val list = Stream.from(1)

  val page = 2L
  val size = 3

  val result = Source(list)
    .grouped(size)
    .zipWithIndex
    .dropWhile {
      case (_, index) ⇒
        index != (page - 1L)
    }
    .takeWhile {
      case (_, index) ⇒
        index != page
    }
    .map(_._1)
    .toMat(Sink.head)(Keep.right)
    .run

  result.onComplete {
    case Success(values) ⇒
      logger.info(s"complated: $values")
      system.terminate()
    case _ ⇒
      system.terminate()
  }

}

grouped

最初に grouped を使用して指定した数のコレクションに区切ります。下のように、grouped を使うと以下のように区切られます。

// Returns List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))
(1 to 10).iterator.grouped(5).toList

zipWithIndex

次に、zipWithIndex を使用して、各ページに0からはじまるインデックス(ページ)を付けます。 先の grouped の例に zipWithIndex を組み合わせると以下のような結果を得ることができます。

// Returns List((List(1, 2, 3, 4, 5),0), (List(6, 7, 8, 9, 10),1))
(1 to 10).iterator.grouped(5).zipWithIndex.toList

dropWhile

dropWhile を使用して指定ページまでスキップします。

takeWhile

最後に takeWhile を使用して指定ページのデータを取り出します。

遅延評価 val

Scala の lazy val についてコップ本(第2版) では、遅延評価 val は、2 回以上評価されることはない ... 初めて評価された後にその結果値が格納され、その後で同じ val が使われるときに結果値が再利用される。とあり、たしかに次のような lazy val を記述した場合は、何度 x を使っても、最初の1回でログが出力され 2 回目以降ログは出力されません。

lazy val x: Int = {
  logger.info("x = 3")
  3
}

ですが、以下のように書いた場合には、(1) 最初の 1 回目の値に固定されるのか、(2) 結果値の再利用はされず毎回評価されるのか、(3) あるいは与えられた値と結果値を保持するのか、どういう動きになるのだろうと試してみました。

lazy val x: Int = (i: Int) => {
  logger.info(s"$i")
  i
}

このようにすると、(2) 結果値の再利用はされず毎回評価される、つまり、def を使ったのと同じく x を使う度にログが出力されました。

def x(i: Int) = {
  logger.info(s"$i")
  i
}

Akka Streams - エラー時のリスタート

Actor の場合の backoff supervision pattern for actors (Supervision and Monitoring • Akka Documentation) のように Akka Streams には、RestartSource, RestartSink, RestartFlow があります。

このサンプルでは、RestartSource を使って、リトライを 2回 行います(結果的にはリトライも失敗してフローそのものがエラーにより終了します)。

package jp.pigumer.akka

import akka.NotUsed
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream._
import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object RetrySpec extends App {

  implicit val system: ActorSystem = ActorSystem("RetrySpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val source: Source[Int, NotUsed] = RestartSource.withBackoff(
    minBackoff = 1 seconds,
    maxBackoff = 1 seconds,
    randomFactor = 0.2,
    maxRestarts = 2
  ) { () ⇒
    Source.single(3)
      .log("source")
      .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
      .map {
        case i @ 3 ⇒
          logger.info(s"retry: $i")
          throw new RuntimeException()
        case i ⇒
          i
      }
  }

  val done =
    source
      .toMat(Sink.ignore)(Keep.right)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }
}

Akka Streams - 時間のかかる Flow をタイムアウトにより失敗させる

あるフローの処理時間がかかりすぎた場合に異常と判断してタイムアウトさせるために、 completionTimeout を使ったサンプルコードです。

package jp.pigumer.akka

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}
import akka.util.Timeout

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class SlowActor extends Actor {
  override def receive = {
    case i: Int ⇒
      implicit val executionContext = context.system.dispatcher
      val originalSender = sender
      Future {
        Thread.sleep(i * 500)
        originalSender ! i
      }
  }
}

object CompletionTimeoutSpec extends App {

  implicit val system: ActorSystem = ActorSystem("CompletionTimeoutSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val ref = system.actorOf(Props[SlowActor])
  implicit val timeout: Timeout = 60 seconds

  val source = Source(Stream.from(1))
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].ask[Int](ref)
    .completionTimeout(3 seconds)
    .log("flow", elem ⇒ s"$elem")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val done =
    source
      .via(flow).toMat(Sink.ignore)(Keep.right)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }
}

Akka Streams - KillSwitch で外から FlowShape を制御する

KillSwitch をフローで使用して、viaMat(killSwitch)(Keep.right) で、後で使用できるようにします。 サンプルコードでは、フローの完了を取得するため、toMat(sink)(Keep.both) を使って、KillSwitch と Future[Done] の両方を受取り、 2 秒後に KillSwitch.shutdown() を実行して RunnableGraph を正常終了させます。

エラーで終了させたい場合は、KillSwitch.abort(Throwable) を使うことができます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, DelayOverflowStrategy, KillSwitches}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object KillSwitchSpec extends App {

  implicit val system: ActorSystem = ActorSystem("KillSwitchSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val source = Source(Stream.from(1))
    .delay(1 seconds, DelayOverflowStrategy.backpressure)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val killSwitch = KillSwitches.single[Int]
  val flow = Flow[Int].map { i ⇒
    i
  }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val (k, done) =
    source
      .viaMat(killSwitch)(Keep.right)
      .via(flow).toMat(Sink.ignore)(Keep.both)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }

  Thread.sleep(2000)
  k.shutdown()
}

Akka Streams - async, groupBy

async

まず async を使わないコードからはじめてみます。

この場合は、単純に 1 から 10 まで順番に処理が実行されます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

次に async をつけて実行してみます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).async
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。

groupBy と async

groupBy と async を使った SubFlow で試してみます

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).groupBy(3, _ % 3)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。

Akka Streams - Source に Actor, Queue を使うコード

Actor を使う

DeadLetter を Subscribe する Actor を Akka Stream で実装してみます

コードは下のようになります

  val actorRef: ActorRef = 
    Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。

返された actorRef は下のように DeadLetter を Subscribe するために使用できます。

val system: ActorSystem

system.eventStream.subscribe(actorRef, classOf[DeadLetter])

Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。

backpressure を使いたい場合は、Queue を使用します。

Queue を使う

Queue を使うコードは下のようになります

  val queue: SourceQueueWithComplete[DeadLetter] =
    Source.queue[DeadLetter](100, OverflowStrategy.backpressure)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()