Fargate がついに東京に

Service

首を長くして待ってましたが、ついに Fargate が東京にやってきました。 ということで、以前米国東部 (バージニア北部) に作っていたものを、 東京(ap-northeast-1)に変えて実行してみました。

GitHub - PigumerGroup/aws-ecs-fargate

Task

polly で mp3 を生成してそれを S3 に保存する Task を Fargate で実行できるようにしてみました。

GitHub - PigumerGroup/sbt-aws-cloudformation-example

両方とも、ちゃんと実行できましたぁ。うれしい。

これからは、どんどん ECS を使っていきたい気持ちです。

スレッド数を CloudWatch のカスタムメトリクスとして Publish する

micrometer を使ってスレッド数を CloudWatch のカスタムメトリクスとして Publish してみました

// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-cloudwatch
libraryDependencies += "io.micrometer" % "micrometer-registry-cloudwatch" % "1.0.5"
package com.pigumer.example;

import java.util.Properties;

public class MicrometerProperties {

    public Properties DEFAULT;

    public MicrometerProperties() {
        DEFAULT = new Properties();
        DEFAULT.setProperty("cloudwatch.enabled", "true");
        DEFAULT.setProperty("cloudwatch.namespace", "MICROMETER");
        DEFAULT.setProperty("cloudwatch.step", "PT1M");
        DEFAULT.setProperty("cloudwatch.batchSize", "1");
    }
}
package com.pigumer.example;

import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import io.micrometer.cloudwatch.CloudWatchConfig;
import io.micrometer.cloudwatch.CloudWatchMeterRegistry;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics;

import java.time.Duration;
import java.util.Properties;
import java.util.stream.IntStream;

public class Main {
    private static AmazonCloudWatchAsync cloudWatch =
         AmazonCloudWatchAsyncClientBuilder.standard().withRegion("ap-northeast-1").build();
    private static Clock clock = Clock.SYSTEM;
    private static CloudWatchConfig config =
            new CloudWatchConfig() {
                private Properties properties =
                        new MicrometerProperties().DEFAULT;
                @Override
                public String get(String key) {
                    return properties.getProperty(key);
                }
            };

    private static Runnable sleep = () -> {
        try {
            Thread.sleep(60000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    };

    public static void main(String[] args) throws Exception {
        CloudWatchMeterRegistry registry =
                new CloudWatchMeterRegistry(config, clock, cloudWatch);
        new JvmThreadMetrics().bindTo(registry);

        IntStream.range(1, 20).forEach(n -> new Thread(sleep).run());

        Thread.sleep(60000 * 3);
    }
}

スレッド数を CloudWatch のカスタムメトリクスとして Publish する

package jp.pigumer.monitor;

import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder;
import com.amazonaws.services.cloudwatch.model.Dimension;
import com.amazonaws.services.cloudwatch.model.MetricDatum;
import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
import com.amazonaws.services.cloudwatch.model.StandardUnit;

import java.lang.management.ManagementFactory;
import java.util.Date;

public class CloudWatchReporter implements Runnable {

    private AmazonCloudWatchAsync cloundwatch = AmazonCloudWatchAsyncClientBuilder.standard().build();
    private String id;

    public CloudWatchReporter(String id) {
        this.id = id;
    }

    @Override
    public void run() {
        Double count = (double) ManagementFactory.getThreadMXBean().getThreadCount();
        MetricDatum metricData = new MetricDatum()
                .withDimensions(new Dimension().withName("Id").withValue(id))
                .withMetricName("thread_count")
                .withUnit(StandardUnit.Count)
                .withValue(count)
                .withTimestamp(new Date());
        PutMetricDataRequest request = new PutMetricDataRequest()
                .withNamespace("CUSTOM/TEST")
                .withMetricData(metricData);
        cloundwatch.putMetricDataAsync(request);
    }
}

この CloudWatchReporter を 1 分間隔で実行して CloudWatch に Publish します。

    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture handler = scheduler.scheduleAtFixedRate(
            new CloudWatchReporter("test"),
            0,
            1,
            TimeUnit.MINUTES);

f:id:section27:20180612234607p:plain

GitHub - takesection/jmxmonitor

TOTP - Time-Based One-Time Password

TOTP について調べるため、AWS の MFA で使うことのできる 仮想 MFA アプリケーションと同様の機能を持つアプリケーションを作成してみました。

TOTP の仕様

使い方

ユーザのホームディレクトリに .onetime というディレクトリを作成し、そのディレクトリに onetime.properties というファイルを作成します。

$ mkdir ~/.onetime
$ cat > ~/.onetime/onetime.properties

onetime.properties には、AWS の MFA を有効にしたときに表示されるシークレットキーを設定します。

secret=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX

DynamoDB - query

クエリの操作 - Amazon DynamoDB には、 "A single Query will only return a result set that fits within the 1 MB size limit." 「1 つの Query は、1 MB のサイズ制限の範囲内の結果セットだけを返します。」と書かれています。

query を実行したとき返される Result の getItems と、withSelect("COUNT") を使用した場合のように getItems のない件数取得のみの場合とではどんな違いがあるのだろうと思い、DynamoDB ローカルと東京リージョンの DynamoDB とで 同じコードを実行して試してみました。

その結果、withSelect("COUNT") の場合でも、結果セットを取得する場合でも同じ件数で分割されて返されることがわかりました。

試したコードを下に記載します。なお、DynamoDB ローカルと AWS 上の DynamoDB では分割される件数は差異がありました。

package jp.pigumer.dynamodb

import java.util.UUID

import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item}
import com.amazonaws.services.dynamodbv2.model._

import scala.util.Try

case class Data(hashKey: String,
                rangeKey: String,
                rangeKey2: String) {
  val item = new Item()
    .withPrimaryKey("HashKey", hashKey,
      "RangeKey", rangeKey)
    .withString("RangeKey2", rangeKey2)
}

object TestTable {

  val hashKeyValue = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"

  def deleteTable(dynamoDB: DynamoDB) = {
    dynamoDB.getTable("TEST1").delete()
  }

  def createTable(dynamoDB: DynamoDB) = {
    val request = new CreateTableRequest()
      .withTableName("TEST1")
      .withAttributeDefinitions(
        new AttributeDefinition()
          .withAttributeName("HashKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey")
          .withAttributeType("S"),
        new AttributeDefinition()
          .withAttributeName("RangeKey2")
          .withAttributeType("S")
      )
      .withKeySchema(
        new KeySchemaElement()
          .withAttributeName("HashKey")
          .withKeyType("HASH"),
        new KeySchemaElement()
          .withAttributeName("RangeKey")
          .withKeyType("RANGE")
      )
      .withProvisionedThroughput(
        new ProvisionedThroughput()
          .withReadCapacityUnits(1L)
          .withWriteCapacityUnits(1L)
      )
      .withGlobalSecondaryIndexes(
        new GlobalSecondaryIndex()
          .withIndexName("GlobalIndex")
          .withKeySchema(
            new KeySchemaElement()
              .withAttributeName("HashKey")
              .withKeyType("HASH"),
            new KeySchemaElement()
              .withAttributeName("RangeKey2")
              .withKeyType("RANGE")
          )
          .withProjection(
            new Projection().withProjectionType("ALL")
          )
          .withProvisionedThroughput(
            new ProvisionedThroughput()
              .withReadCapacityUnits(1L)
              .withWriteCapacityUnits(1L)
          )
      )
    val result: TableDescription = dynamoDB.createTable(request).waitForActive()
    println(result)
  }

  def saveTestData(dynamoDB: DynamoDB) = Try {
    val table = dynamoDB.getTable("TEST1")
    for (_ <- 1 to 1000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").item)
    }
    for (_ <- 1 to 40000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC").item)
    }
    for (_ <- 1 to 10000) {
      table.putItem(Data(hashKeyValue,
        UUID.randomUUID().toString,
        "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB").item)
    }
  }.get

}
package jp.pigumer.dynamodb

import java.util.logging.Logger

import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.dynamodbv2.document.DynamoDB
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest, QueryResult}
import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder}
import org.specs2.mutable.Specification
import org.specs2.specification.BeforeAfterAll

import scala.collection.JavaConverters._

class DynamoDBSpec extends Specification with BeforeAfterAll {

  val logger = Logger.getLogger(this.getClass.getName)

  lazy val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder
    .standard()
    .withEndpointConfiguration(
      new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", "ap-northeast-1"))
    .build()
  lazy val dynamoDB = new DynamoDB(client)

  private def query(request: QueryRequest): Iterator[QueryResult] =
    new Iterator[QueryResult] {
      var result: Option[QueryResult] = None
      override def hasNext: Boolean = result.fold(true)(
        queryResult ⇒ null != queryResult.getLastEvaluatedKey
      )
      override def next(): QueryResult =
      result match {
        case Some(previous) ⇒
          result = Some(client.query(request.withExclusiveStartKey(previous.getLastEvaluatedKey())))
          result.get
        case None ⇒
          result = Some(client.query(request))
          result.get
      }
    }

  "Test" should {
    "Count" in {
      logger.info("start")
      val request = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withSelect("COUNT")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
          "#R""RangeKey2").asJava)
        .withExpressionAttributeValues(Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")).asJava)

      val list = query(request).toList
      logger.info(s"list size: ${list.length}")
      val result = list(0)
      logger.info(s"completed: ${result.getCount}")
      result.getCount must_== 8457

      val result2 = list(1)
      logger.info(s"completed2: ${result2.getCount}")
      result2.getCount must_== 1543
    }

    "Page" in {
      logger.info("start")
      val queryRequest = new QueryRequest()
        .withTableName("TEST1")
        .withIndexName("GlobalIndex")
        .withKeyConditionExpression("#H = :H and #R = :R")
        .withExpressionAttributeNames(Map("#H""HashKey",
        "#R""RangeKey2").asJava)
        .withExpressionAttributeValues( Map(":H"new AttributeValue().withS(TestTable.hashKeyValue),
          ":R"new AttributeValue().withS("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")).asJava)
      val list = query(queryRequest).toList
      val s = list(0).getItems().size
      logger.info(s"size: $s")
      s must_== 8457
    }
  }

  override def beforeAll: Unit = {
    TestTable.createTable(dynamoDB)
    TestTable.saveTestData(dynamoDB)
  }

  override def afterAll: Unit = {
    TestTable.deleteTable(dynamoDB)
  }
}

Akka Streams - Broadcast と Thread

Broadcast

下のような Broadcast を使ったコードを記述します

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。

[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
t

ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。

マルチスレッドで Broadcast された Flow を実行する

テストコードを以下のようにします。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.Logging
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith}
import akka.stream.{ActorMaterializer, Attributes, FlowShape}
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification
import org.specs2.specification.Scope

import scala.concurrent.Await
import scala.concurrent.duration._

class DeadlockSpec extends Specification with TestKitBase {
  override implicit lazy val system = ActorSystem("Deadlock")

  "Test" should {
    "test1" in new WithFixture {
      val done = Source.single(1).via(
        broadcasts
      ).runWith(Sink.ignore)

      Await.ready(done, 10 seconds)
      done.value.get.get
      success
    }
  }

  trait WithFixture extends Scope {
    implicit val materializer = ActorMaterializer()

    val broadcasts =
      Flow.fromGraph(GraphDSL.create() { implicit b ⇒
        import GraphDSL.Implicits._

        val bc = b.add(Broadcast[Int](5))
        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

        val zip = b.add(ZipWith[String, String, String, String, String,
          (String, String, String, String, String)]((_, _, _, _, _)))

        bc.out(0) ~> w("wait 0") ~> zip.in0
        bc.out(1) ~> w("wait 1") ~> zip.in1
        bc.out(2) ~> w("wait 2") ~> zip.in2
        bc.out(3) ~> w("wait 3") ~> zip.in3
        bc.out(4) ~> w("wait 4") ~> zip.in4

        FlowShape(bc.in, zip.out)
      })
  }
}

1 秒 Wait する Flow に async を付加しています。

        val w = (name: String) ⇒ Flow[Int].map { n ⇒
          Thread.sleep(1000)
          s"$n - ${Thread.currentThread.getName}"
        }
          .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
          .async

ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。

[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10
t

default-dispatcher のスレッド数を 1 にする

application.conf を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。

akka {
  actor {
    default-dispatcher {
      fork-join-executor {
        parallelism-min = 1
        parallelism-factor = 1.0
        parallelism-max = 1
      }
    }
    throughput = 1
  }
}

スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。

[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
[INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2

Deadlock に注意

async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。

Pipelining and Parallelism • Akka Documentation

Akka Streams - javax.sound を使って mp3 を再生する

javax.sound を使って、mp3 を再生する

package jp.pigumer.cast

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}

import akka.stream.scaladsl.Flow
import akka.util.ByteString
import javax.sound.sampled.AudioFormat.Encoding
import javax.sound.sampled._

object Player {

  val mixerInfo: Array[Mixer.Info] = AudioSystem.getMixerInfo()

  val convert = (bytes: ByteString) ⇒ {

    def write(f: OutputStream ⇒ Unit) = {
      val output = new ByteArrayOutputStream()
      try {
        f(output)
        ByteString(output.toByteArray)
      } finally {
        output.close()
      }
    }

    val originalAudioInput = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
    val originalFormat = originalAudioInput.getFormat()
    val converted: AudioInputStream = AudioSystem.getAudioInputStream(
      new AudioFormat(
        Encoding.PCM_SIGNED,
        originalFormat.getSampleRate,
        16,
        originalFormat.getChannels,
        originalFormat.getChannels * 2,
        originalFormat.getSampleRate,
        false
      ),
      originalAudioInput)

    write(output ⇒ AudioSystem.write(converted, AudioFileFormat.Type.AU, output))
  }

  val play = (index: Int) ⇒
    Flow[ByteString].map { bytes ⇒
      val input = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray))
      val mixer = mixerInfo(index)
      val clip = AudioSystem.getClip(mixer)
      clip.open(input)
      clip.start()
      while (!clip.isRunning) {
        Thread.sleep(500)
      }
      while (clip.isRunning) {
        Thread.sleep(500)
      }
      clip.close()
      input
    }
}

Play

    val done = FileIO.fromPath(Paths.get("src/main/resources/hello.mp3"))
      .map(Player.convert)
      .via(Player.play(0)).withAttributes(
        ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher"))
      .runWith(Sink.ignore)

GitHub - takesection/arm32v6-scala-cast