Akka Streams - Paginator

一覧表示でよくある Paginator を実装してみます。

このサンプルコードでは、Paginator の対象となるデータは、Stream.from(1) で生成するため、1 からはじまる無限個のデータとなります。 そのため、すべてのページのデータを作成してから、必要なページのデータを抽出するということはできません。 該当ページのデータまでスキップし、該当ページのデータが取れたところで処理を止める必要があります。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.util.Success

object PaginatorSpec extends App {

  implicit val system: ActorSystem = ActorSystem("PaginatorSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val list = Stream.from(1)

  val page = 2L
  val size = 3

  val result = Source(list)
    .grouped(size)
    .zipWithIndex
    .dropWhile {
      case (_, index) ⇒
        index != (page - 1L)
    }
    .takeWhile {
      case (_, index) ⇒
        index != page
    }
    .map(_._1)
    .toMat(Sink.head)(Keep.right)
    .run

  result.onComplete {
    case Success(values) ⇒
      logger.info(s"complated: $values")
      system.terminate()
    case _ ⇒
      system.terminate()
  }

}

grouped

最初に grouped を使用して指定した数のコレクションに区切ります。下のように、grouped を使うと以下のように区切られます。

// Returns List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))
(1 to 10).iterator.grouped(5).toList

zipWithIndex

次に、zipWithIndex を使用して、各ページに0からはじまるインデックス(ページ)を付けます。 先の grouped の例に zipWithIndex を組み合わせると以下のような結果を得ることができます。

// Returns List((List(1, 2, 3, 4, 5),0), (List(6, 7, 8, 9, 10),1))
(1 to 10).iterator.grouped(5).zipWithIndex.toList

dropWhile

dropWhile を使用して指定ページまでスキップします。

takeWhile

最後に takeWhile を使用して指定ページのデータを取り出します。