スレッド数を CloudWatch のカスタムメトリクスとして Publish する

package jp.pigumer.monitor;

import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.MetricDatum;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import com.amazonaws.services.cloudwatch.model.StandardUnit;

import java.lang.management.ManagementFactory;
import java.util.Date;

public class CloudWatchReporter implements Runnable {

    private AmazonCloudWatchAsync cloundwatch = AmazonCloudWatchAsyncClientBuilder.standard().build();
    private String id;

    public CloudWatchReporter(String id) {
        this.id = id;
    }

    @Override
    public void run() {
        Double count = (double) ManagementFactory.getThreadMXBean().getThreadCount();
        MetricDatum metricData = new MetricDatum()
                .withDimensions(new Dimension().withName("Id").withValue(id))
                .withMetricName("thread_count")
                .withUnit(StandardUnit.Count)
                .withValue(count)
                .withTimestamp(new Date());
        PutMetricDataRequest request = new PutMetricDataRequest()
                .withNamespace("CUSTOM/TEST")
                .withMetricData(metricData);
        cloundwatch.putMetricDataAsync(request);
    }
}

この CloudWatchReporter を 1 分間隔で実行して CloudWatch に Publish します。

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture handler = scheduler.scheduleAtFixedRate(
            new CloudWatchReporter("test"),
            0,
            1,
            TimeUnit.MINUTES);

f:id:section27:20180612234607p:plain

GitHub - takesection/jmxmonitor

TOTP - Time-Based One-Time Password

TOTP について調べるため、AWS の MFA で使うことのできる 仮想 MFA アプリケーションと同様の機能を持つアプリケーションを作成してみました。

TOTP の仕様

使い方

ユーザのホームディレクトリに .onetime というディレクトリを作成し、そのディレクトリに onetime.properties というファイルを作成します。

$ mkdir ~/.onetime
$ cat > ~/.onetime/onetime.properties

onetime.properties には、AWS の MFA を有効にしたときに表示されるシークレットキーを設定します。

secret=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

DynamoDB - query

クエリの操作 - Amazon DynamoDB には、 "A single Query will only return a result set that fits within the 1 MB size limit." 「1 つの Query は、1 MB のサイズ制限の範囲内の結果セットだけを返します。」と書かれています。

query を実行したとき返される Result の getItems と、withSelect("COUNT") を使用した場合のように getItems のない件数取得のみの場合とではどんな違いがあるのだろうと思い、DynamoDB ローカルと東京リージョンの DynamoDB とで 同じコードを実行して試してみました。

その結果、withSelect("COUNT") の場合でも、結果セットを取得する場合でも同じ件数で分割されて返されることがわかりました。

試したコードを下に記載します。なお、DynamoDB ローカルと AWS 上の DynamoDB では分割される件数は差異がありました。

package jp.pigumer.dynamodb

import java.util.UUID

import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.dynamodbv2.model._

import scala.util.Try

case class Data(hashKey: String,
                rangeKey: String,
                rangeKey2: String) {
  val item = new Item()
    .withPrimaryKey("HashKey", hashKey,
      "RangeKey", rangeKey)
    .withString("RangeKey2", rangeKey2)
}

object TestTable {

  val hashKeyValue = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"

  def deleteTable(dynamoDB: DynamoDB) = {
    dynamoDB.getTable("TEST1").delete()
  }

  def createTable(dynamoDB: DynamoDB) = {
    val request = new CreateTableRequest()
      .withTableName("TEST1")
      .withAttributeDefinitions(
        new AttributeDefinition()
          .withAttributeName("HashKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey2")
          .withAttributeType("S")
      )
      .withKeySchema(
        new KeySchemaElement()
          .withAttributeName("HashKey")
          .withKeyType("HASH"),
        new KeySchemaElement()
          .withAttributeName("RangeKey")
          .withKeyType("RANGE")
      )
      .withProvisionedThroughput(
        new ProvisionedThroughput()
          .withReadCapacityUnits(1L)
          .withWriteCapacityUnits(1L)
      )
      .withGlobalSecondaryIndexes(
        new GlobalSecondaryIndex()
          .withIndexName("GlobalIndex")
          .withKeySchema(
            new KeySchemaElement()
              .withAttributeName("HashKey")
              .withKeyType("HASH"),
            new KeySchemaElement()
              .withAttributeName("RangeKey2")
              .withKeyType("RANGE")
          )
          .withProjection(
            new Projection().withProjectionType("ALL")
          )
          .withProvisionedThroughput(
            new ProvisionedThroughput()
              .withReadCapacityUnits(1L)
              .withWriteCapacityUnits(1L)
          )
      )
    val result: TableDescription = dynamoDB.createTable(request).waitForActive()
    println(result)
  }

  def saveTestData(dynamoDB: DynamoDB) = Try {
    val table = dynamoDB.getTable("TEST1")
    for (_ <- 1 to 1000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").item)
    }
    for (_ <- 1 to 40000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC").item)
    }
    for (_ <- 1 to 10000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB").item)
    }
  }.get

}
package jp.pigumer.dynamodb

import java.util.logging.Logger

import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest, QueryResult}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import org.specs2.mutable.Specification
import org.specs2.specification.BeforeAfterAll

import scala.collection.JavaConverters._

class DynamoDBSpec extends Specification with BeforeAfterAll {

  val logger = Logger.getLogger(this.getClass.getName)

  lazy val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder
    .standard()
    .withEndpointConfiguration(
      new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", "ap-northeast-1"))
    .build()
  lazy val dynamoDB = new DynamoDB(client)

  private def query(request: QueryRequest): Iterator[QueryResult] =
    new Iterator[QueryResult] {
      var result: Option[QueryResult] = None
      override def hasNext: Boolean = result.fold(true)(
        queryResult ⇒ null != queryResult.getLastEvaluatedKey
      )
      override def next(): QueryResult =
      result match {
        case Some(previous) ⇒
          result = Some(client.query(request.withExclusiveStartKey(previous.getLastEvaluatedKey())))
          result.get
        case None ⇒
          result = Some(client.query(request))
          result.get
      }
    }

  "Test" should {
    "Count" in {
      logger.info("start")
      val request = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withSelect("COUNT")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
          "#R""RangeKey2").asJava)
        .withExpressionAttributeValues(Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")).asJava)

      val list = query(request).toList
      logger.info(s"list size: ${list.length}")
      val result = list(0)
      logger.info(s"completed: ${result.getCount}")
      result.getCount must_== 8457

      val result2 = list(1)
      logger.info(s"completed2: ${result2.getCount}")
      result2.getCount must_== 1543
    }

    "Page" in {
      logger.info("start")
      val queryRequest = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
        "#R""RangeKey2").asJava)
        .withExpressionAttributeValues( Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")).asJava)
      val list = query(queryRequest).toList
      val s = list(0).getItems().size
      logger.info(s"size: $s")
      s must_== 8457
    }
  }

  override def beforeAll: Unit = {
    TestTable.createTable(dynamoDB)
    TestTable.saveTestData(dynamoDB)
  }

  override def afterAll: Unit = {
    TestTable.deleteTable(dynamoDB)
  }
}

Akka Streams - Broadcast と Thread

Broadcast

下のような Broadcast を使ったコードを記述します

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。

[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
t

ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。

マルチスレッドで Broadcast された Flow を実行する

テストコードを以下のようにします。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))
        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

1 秒 Wait する Flow に async を付加しています。

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。

[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10
t

default-dispatcher のスレッド数を 1 にする

application.conf を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。

akka {
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
    throughput = 1
  }
}

スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。

[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2

Deadlock に注意

async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。

Pipelining and Parallelism • Akka Documentation

Akka Streams - javax.sound を使って mp3 を再生する

javax.sound を使って、mp3 を再生する

package jp.pigumer.cast

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import akka.stream.scaladsl.Flow
import akka.util.ByteString
import javax.sound.sampled.AudioFormat.Encoding
import javax.sound.sampled._

object Player {

  val mixerInfo: Array[Mixer.Info] = AudioSystem.getMixerInfo()

  val convert = (bytes: ByteString) ⇒ {

    def write(f: OutputStream ⇒ Unit) = {
      val output = new ByteArrayOutputStream()
      try {
        f(output)
        ByteString(output.toByteArray)
      } finally {
        output.close()
      }
    }

    val originalAudioInput = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
    val originalFormat = originalAudioInput.getFormat()
    val converted: AudioInputStream = AudioSystem.getAudioInputStream(
      new AudioFormat(
        Encoding.PCM_SIGNED,
        originalFormat.getSampleRate,
        16,
        originalFormat.getChannels,
        originalFormat.getChannels * 2,
        originalFormat.getSampleRate,
        false
      ),
      originalAudioInput)

    write(output ⇒ AudioSystem.write(converted, AudioFileFormat.Type.AU, output))
  }

  val play = (index: Int) ⇒
    Flow[ByteString].map { bytes ⇒
      val input = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
      val mixer = mixerInfo(index)
      val clip = AudioSystem.getClip(mixer)
      clip.open(input)
      clip.start()
      while (!clip.isRunning) {
        Thread.sleep(500)
      }
      while (clip.isRunning) {
        Thread.sleep(500)
      }
      clip.close()
      input
    }
}

Play

    val done = FileIO.fromPath(Paths.get("src/main/resources/hello.mp3"))
      .map(Player.convert)
      .via(Player.play(0)).withAttributes(
        ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher"))
      .runWith(Sink.ignore)

GitHub - takesection/arm32v6-scala-cast

Akka Streams - Source

Source(1 to 10) のように Source と宣言されているものはもちろん Source ですが、 Source ではじまっていて、Sink で終わっていない場合は、Source として扱うことができます。

val source: Source[String, NotUsed] =
  Source(1 to 10)
    .via(Flow[Int].fold(0)((acc, n) ⇒ acc + n)
    .map(_.toString))
package jp.pigumer.akka

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification

class SourceSpec extends Specification with TestKitBase {

  override implicit lazy val system = ActorSystem("Test")
  implicit val materializer = ActorMaterializer()

  "Source" should {
    "sample1" in {
      val source: Source[String, NotUsed] =
        Source(1 to 10).via(Flow[Int].fold(0)((acc, n) ⇒ acc + n).map(_.toString))
      val result = source.runWith(TestSink.probe[String])
      result.request(1).expectNext must_== "55"
    }
  }
}

flatMapConcat のような Source を返さないといけない場合に、Source として扱えることを知っておくと役立ちます

Future のテスト

Future は、処理がブロックされず、いずれ答えを返してくれる便利なものですが、 テストコードのように結果を検証したい場合は処理の完了を待つ必要があります。

下のサンプルコードでは、Future を Await.ready を使って、実行が完了するのを待ちます。 Await.ready に指定した時間が経過しても処理が完了しない場合は例外がスローされてテストケースは 失敗します。 実行が完了した場合は、Future.value.get で処理結果を取得します。

package jp.pigumer.akka

import akka.actor.{ActorSystem, Props}
import akka.event.Logging
import akka.util.Timeout
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.duration._
import akka.pattern._

import scala.concurrent.Await

class FutureSpec extends Specification {

  trait WithFixture extends Scope {
    implicit val system = ActorSystem("FutureSpec")
    implicit val logger = Logging(system, classOf[WithFixture])

    val actor = system.actorOf(Props[SlowActor])
  }

  "Future Test" should {
    "Success" in new WithFixture {
      implicit val timeout: Timeout = 1 seconds

      logger.info("start")
      val e = actor ? 1
      Await.ready(e, 60 minutes)
      logger.info("finish")

      e.value.get.fold(
        cause ⇒ {
            logger.error(cause, "failed")
            failure
          },
        _ ⇒ success
      )
    }

    "Timeout" in new WithFixture {
      implicit val timeout: Timeout = 1 seconds

      logger.info("start")
      val e = actor ? 4
      Await.ready(e, 5 seconds)
      logger.info("finish")

      e.value.get.fold(
        cause ⇒ {
          logger.error(cause, "failed")
          success
        },
        _ ⇒ failure
      )
    }
  }
}