Akka Streams - async, groupBy
async
まず async を使わないコードからはじめてみます。
この場合は、単純に 1 から 10 まで順番に処理が実行されます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i } .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
次に async をつけて実行してみます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).async .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。
groupBy と async
groupBy と async を使った SubFlow で試してみます
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).groupBy(3, _ % 3) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。