Akka Streams - async, groupBy

async

まず async を使わないコードからはじめてみます。

この場合は、単純に 1 から 10 まで順番に処理が実行されます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

次に async をつけて実行してみます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).async
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。

groupBy と async

groupBy と async を使った SubFlow で試してみます

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).groupBy(3, _ % 3)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。