Akka Streams - Broadcast と Thread

Broadcast

下のような Broadcast を使ったコードを記述します

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。

[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
t

ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。

マルチスレッドで Broadcast された Flow を実行する

テストコードを以下のようにします。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))
        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

1 秒 Wait する Flow に async を付加しています。

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。

[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10
t

default-dispatcher のスレッド数を 1 にする

application.conf を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。

akka {
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
    throughput = 1
  }
}

スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。

[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2

Deadlock に注意

async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。

Pipelining and Parallelism • Akka Documentation

Akka Streams - javax.sound を使って mp3 を再生する

javax.sound を使って、mp3 を再生する

package jp.pigumer.cast

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import akka.stream.scaladsl.Flow
import akka.util.ByteString
import javax.sound.sampled.AudioFormat.Encoding
import javax.sound.sampled._

object Player {

  val mixerInfo: Array[Mixer.Info] = AudioSystem.getMixerInfo()

  val convert = (bytes: ByteString) ⇒ {

    def write(f: OutputStream ⇒ Unit) = {
      val output = new ByteArrayOutputStream()
      try {
        f(output)
        ByteString(output.toByteArray)
      } finally {
        output.close()
      }
    }

    val originalAudioInput = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
    val originalFormat = originalAudioInput.getFormat()
    val converted: AudioInputStream = AudioSystem.getAudioInputStream(
      new AudioFormat(
        Encoding.PCM_SIGNED,
        originalFormat.getSampleRate,
        16,
        originalFormat.getChannels,
        originalFormat.getChannels * 2,
        originalFormat.getSampleRate,
        false
      ),
      originalAudioInput)

    write(output ⇒ AudioSystem.write(converted, AudioFileFormat.Type.AU, output))
  }

  val play = (index: Int) ⇒
    Flow[ByteString].map { bytes ⇒
      val input = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
      val mixer = mixerInfo(index)
      val clip = AudioSystem.getClip(mixer)
      clip.open(input)
      clip.start()
      while (!clip.isRunning) {
        Thread.sleep(500)
      }
      while (clip.isRunning) {
        Thread.sleep(500)
      }
      clip.close()
      input
    }
}

Play

    val done = FileIO.fromPath(Paths.get("src/main/resources/hello.mp3"))
      .map(Player.convert)
      .via(Player.play(0)).withAttributes(
        ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher"))
      .runWith(Sink.ignore)

GitHub - takesection/arm32v6-scala-cast

Akka Streams - Source

Source(1 to 10) のように Source と宣言されているものはもちろん Source ですが、 Source ではじまっていて、Sink で終わっていない場合は、Source として扱うことができます。

val source: Source[String, NotUsed] =
  Source(1 to 10)
    .via(Flow[Int].fold(0)((acc, n) ⇒ acc + n)
    .map(_.toString))
package jp.pigumer.akka

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification

class SourceSpec extends Specification with TestKitBase {

  override implicit lazy val system = ActorSystem("Test")
  implicit val materializer = ActorMaterializer()

  "Source" should {
    "sample1" in {
      val source: Source[String, NotUsed] =
        Source(1 to 10).via(Flow[Int].fold(0)((acc, n) ⇒ acc + n).map(_.toString))
      val result = source.runWith(TestSink.probe[String])
      result.request(1).expectNext must_== "55"
    }
  }
}

flatMapConcat のような Source を返さないといけない場合に、Source として扱えることを知っておくと役立ちます

Future のテスト

Future は、処理がブロックされず、いずれ答えを返してくれる便利なものですが、 テストコードのように結果を検証したい場合は処理の完了を待つ必要があります。

下のサンプルコードでは、Future を Await.ready を使って、実行が完了するのを待ちます。 Await.ready に指定した時間が経過しても処理が完了しない場合は例外がスローされてテストケースは 失敗します。 実行が完了した場合は、Future.value.get で処理結果を取得します。

package jp.pigumer.akka

import akka.actor.{ActorSystem, Props}
import akka.event.Logging
import akka.util.Timeout
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.duration._
import akka.pattern._

import scala.concurrent.Await

class FutureSpec extends Specification {

  trait WithFixture extends Scope {
    implicit val system = ActorSystem("FutureSpec")
    implicit val logger = Logging(system, classOf[WithFixture])

    val actor = system.actorOf(Props[SlowActor])
  }

  "Future Test" should {
    "Success" in new WithFixture {
      implicit val timeout: Timeout = 1 seconds

      logger.info("start")
      val e = actor ? 1
      Await.ready(e, 60 minutes)
      logger.info("finish")

      e.value.get.fold(
        cause ⇒ {
            logger.error(cause, "failed")
            failure
          },
        _ ⇒ success
      )
    }

    "Timeout" in new WithFixture {
      implicit val timeout: Timeout = 1 seconds

      logger.info("start")
      val e = actor ? 4
      Await.ready(e, 5 seconds)
      logger.info("finish")

      e.value.get.fold(
        cause ⇒ {
          logger.error(cause, "failed")
          success
        },
        _ ⇒ failure
      )
    }
  }
}

Raspberry Pi の画面を回転

PDF や電子ブックのようなドキュメントを読みたいときは特にですが、画面を回転させて縦にしたくなります。

Raspberry Pi (Raspbian) で 90 度回転させたい場合は、/boot/config.txt にdisplay_hdmi_rotate=1 のように記述してリブートすれば反映されます。

詳しいことは、下のリンクに記述されています。

www.raspberrypi.org

f:id:section27:20180522231010p:plain

Akka Streams - Iterator を Source として使用する

サンプルコード

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.util.Success

object IteratorSpec extends App {

  implicit val system: ActorSystem = ActorSystem("IteratorSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val iterator: Iterator[Int] = new Iterator[Int] {

    var n = 0

    override def hasNext: Boolean = true
    override def next(): Int = {
      n = n + 1
      n
    }
  }

  val result = Source.fromIterator(() ⇒ iterator).takeWhile(_ != 5).toMat(Sink.seq)(Keep.right).run

  result.onComplete {
    case Success(r) ⇒
      logger.info(s"$r")
      system.terminate()
    case _ ⇒
      system.terminate()
  }
}

Akka Streams - Paginator

一覧表示でよくある Paginator を実装してみます。

このサンプルコードでは、Paginator の対象となるデータは、Stream.from(1) で生成するため、1 からはじまる無限個のデータとなります。 そのため、すべてのページのデータを作成してから、必要なページのデータを抽出するということはできません。 該当ページのデータまでスキップし、該当ページのデータが取れたところで処理を止める必要があります。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Keep, Sink, Source}

import scala.concurrent.ExecutionContextExecutor
import scala.util.Success

object PaginatorSpec extends App {

  implicit val system: ActorSystem = ActorSystem("PaginatorSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val list = Stream.from(1)

  val page = 2L
  val size = 3

  val result = Source(list)
    .grouped(size)
    .zipWithIndex
    .dropWhile {
      case (_, index) ⇒
        index != (page - 1L)
    }
    .takeWhile {
      case (_, index) ⇒
        index != page
    }
    .map(_._1)
    .toMat(Sink.head)(Keep.right)
    .run

  result.onComplete {
    case Success(values) ⇒
      logger.info(s"complated: $values")
      system.terminate()
    case _ ⇒
      system.terminate()
  }

}

grouped

最初に grouped を使用して指定した数のコレクションに区切ります。下のように、grouped を使うと以下のように区切られます。

// Returns List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))
(1 to 10).iterator.grouped(5).toList

zipWithIndex

次に、zipWithIndex を使用して、各ページに0からはじまるインデックス(ページ)を付けます。 先の grouped の例に zipWithIndex を組み合わせると以下のような結果を得ることができます。

// Returns List((List(1, 2, 3, 4, 5),0), (List(6, 7, 8, 9, 10),1))
(1 to 10).iterator.grouped(5).zipWithIndex.toList

dropWhile

dropWhile を使用して指定ページまでスキップします。

takeWhile

最後に takeWhile を使用して指定ページのデータを取り出します。