Broadcast
下のような Broadcast を使ったコードを記述します
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。
[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 t
ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。
マルチスレッドで Broadcast された Flow を実行する
テストコードを以下のようにします。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
1 秒 Wait する Flow に async を付加しています。
val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async
ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10 t
default-dispatcher のスレッド数を 1 にする
application.conf
を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。
akka { actor { default-dispatcher { fork-join-executor { parallelism-min = 1 parallelism-factor = 1.0 parallelism-max = 1 } } throughput = 1 } }
スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。
[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
Deadlock に注意
async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。
Pipelining and Parallelism • Akka Documentation
Akka in Action AKKA IN ACTION [ Raymond Roestenburg ]
- ジャンル: 本・雑誌・コミック > 洋書 > COMPUTERS & SCIENCE
- ショップ: 楽天ブックス
- 価格: 12,096円