Akka Streams - Broadcast と Thread

Broadcast

下のような Broadcast を使ったコードを記述します

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。

[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
t

ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。

マルチスレッドで Broadcast された Flow を実行する

テストコードを以下のようにします。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))
        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

1 秒 Wait する Flow に async を付加しています。

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。

[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10
t

default-dispatcher のスレッド数を 1 にする

application.conf を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。

akka {
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
    throughput = 1
  }
}

スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。

[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2

Deadlock に注意

async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。

Pipelining and Parallelism • Akka Documentation