Akka Streams - Paginator
一覧表示でよくある Paginator を実装してみます。
このサンプルコードでは、Paginator の対象となるデータは、Stream.from(1) で生成するため、1 からはじまる無限個のデータとなります。 そのため、すべてのページのデータを作成してから、必要なページのデータを抽出するということはできません。 該当ページのデータまでスキップし、該当ページのデータが取れたところで処理を止める必要があります。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, Sink, Source} import scala.concurrent.ExecutionContextExecutor import scala.util.Success object PaginatorSpec extends App { implicit val system: ActorSystem = ActorSystem("PaginatorSpec") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, this.getClass) val list = Stream.from(1) val page = 2L val size = 3 val result = Source(list) .grouped(size) .zipWithIndex .dropWhile { case (_, index) ⇒ index != (page - 1L) } .takeWhile { case (_, index) ⇒ index != page } .map(_._1) .toMat(Sink.head)(Keep.right) .run result.onComplete { case Success(values) ⇒ logger.info(s"complated: $values") system.terminate() case _ ⇒ system.terminate() } }
grouped
最初に grouped を使用して指定した数のコレクションに区切ります。下のように、grouped を使うと以下のように区切られます。
// Returns List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)) (1 to 10).iterator.grouped(5).toList
zipWithIndex
次に、zipWithIndex を使用して、各ページに0からはじまるインデックス(ページ)を付けます。 先の grouped の例に zipWithIndex を組み合わせると以下のような結果を得ることができます。
// Returns List((List(1, 2, 3, 4, 5),0), (List(6, 7, 8, 9, 10),1)) (1 to 10).iterator.grouped(5).zipWithIndex.toList
dropWhile
dropWhile を使用して指定ページまでスキップします。
takeWhile
最後に takeWhile を使用して指定ページのデータを取り出します。
遅延評価 val
Scala の lazy val についてコップ本(第2版) では、遅延評価 val は、2 回以上評価されることはない ... 初めて評価された後にその結果値が格納され、その後で同じ val が使われるときに結果値が再利用される。とあり、たしかに次のような lazy val を記述した場合は、何度 x を使っても、最初の1回でログが出力され 2 回目以降ログは出力されません。
lazy val x: Int = { logger.info("x = 3") 3 }
ですが、以下のように書いた場合には、(1) 最初の 1 回目の値に固定されるのか、(2) 結果値の再利用はされず毎回評価されるのか、(3) あるいは与えられた値と結果値を保持するのか、どういう動きになるのだろうと試してみました。
lazy val x: Int = (i: Int) => { logger.info(s"$i") i }
このようにすると、(2) 結果値の再利用はされず毎回評価される、つまり、def を使ったのと同じく x を使う度にログが出力されました。
def x(i: Int) = { logger.info(s"$i") i }
Akka Streams - エラー時のリスタート
Actor の場合の backoff supervision pattern for actors (Supervision and Monitoring • Akka Documentation) のように Akka Streams には、RestartSource, RestartSink, RestartFlow があります。
このサンプルでは、RestartSource を使って、リトライを 2回 行います(結果的にはリトライも失敗してフローそのものがエラーにより終了します)。
package jp.pigumer.akka import akka.NotUsed import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream._ import akka.stream.scaladsl.{Keep, RestartSource, Sink, Source} import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ import scala.util.{Failure, Success} object RetrySpec extends App { implicit val system: ActorSystem = ActorSystem("RetrySpec") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, this.getClass) val source: Source[Int, NotUsed] = RestartSource.withBackoff( minBackoff = 1 seconds, maxBackoff = 1 seconds, randomFactor = 0.2, maxRestarts = 2 ) { () ⇒ Source.single(3) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .map { case i @ 3 ⇒ logger.info(s"retry: $i") throw new RuntimeException() case i ⇒ i } } val done = source .toMat(Sink.ignore)(Keep.right) .run() done.onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "error") system.terminate() } }
Akka Streams - 時間のかかる Flow をタイムアウトにより失敗させる
あるフローの処理時間がかかりすぎた場合に異常と判断してタイムアウトさせるために、 completionTimeout を使ったサンプルコードです。
package jp.pigumer.akka import akka.actor.{Actor, ActorSystem, Props} import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import akka.util.Timeout import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success} class SlowActor extends Actor { override def receive = { case i: Int ⇒ implicit val executionContext = context.system.dispatcher val originalSender = sender Future { Thread.sleep(i * 500) originalSender ! i } } } object CompletionTimeoutSpec extends App { implicit val system: ActorSystem = ActorSystem("CompletionTimeoutSpec") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, this.getClass) val ref = system.actorOf(Props[SlowActor]) implicit val timeout: Timeout = 60 seconds val source = Source(Stream.from(1)) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].ask[Int](ref) .completionTimeout(3 seconds) .log("flow", elem ⇒ s"$elem") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val done = source .via(flow).toMat(Sink.ignore)(Keep.right) .run() done.onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "error") system.terminate() } }
Akka Streams - KillSwitch で外から FlowShape を制御する
KillSwitch をフローで使用して、viaMat(killSwitch)(Keep.right) で、後で使用できるようにします。 サンプルコードでは、フローの完了を取得するため、toMat(sink)(Keep.both) を使って、KillSwitch と Future[Done] の両方を受取り、 2 秒後に KillSwitch.shutdown() を実行して RunnableGraph を正常終了させます。
エラーで終了させたい場合は、KillSwitch.abort(Throwable) を使うことができます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes, DelayOverflowStrategy, KillSwitches} import scala.concurrent.ExecutionContextExecutor import scala.concurrent.duration._ import scala.util.{Failure, Success} object KillSwitchSpec extends App { implicit val system: ActorSystem = ActorSystem("KillSwitchSpec") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, this.getClass) val source = Source(Stream.from(1)) .delay(1 seconds, DelayOverflowStrategy.backpressure) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val killSwitch = KillSwitches.single[Int] val flow = Flow[Int].map { i ⇒ i } .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val (k, done) = source .viaMat(killSwitch)(Keep.right) .via(flow).toMat(Sink.ignore)(Keep.both) .run() done.onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "error") system.terminate() } Thread.sleep(2000) k.shutdown() }
Akka Streams - async, groupBy
async
まず async を使わないコードからはじめてみます。
この場合は、単純に 1 から 10 まで順番に処理が実行されます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i } .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
次に async をつけて実行してみます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).async .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。
groupBy と async
groupBy と async を使った SubFlow で試してみます
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).groupBy(3, _ % 3) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。
Akka Streams - Source に Actor, Queue を使うコード
Actor を使う
DeadLetter を Subscribe する Actor を Akka Stream で実装してみます
コードは下のようになります
val actorRef: ActorRef = Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()
to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。
返された actorRef は下のように DeadLetter を Subscribe するために使用できます。
val system: ActorSystem
system.eventStream.subscribe(actorRef, classOf[DeadLetter])
Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。
backpressure を使いたい場合は、Queue を使用します。
Queue を使う
Queue を使うコードは下のようになります
val queue: SourceQueueWithComplete[DeadLetter] = Source.queue[DeadLetter](100, OverflowStrategy.backpressure) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()