Fargate がついに東京に
Service
首を長くして待ってましたが、ついに Fargate が東京にやってきました。 ということで、以前米国東部 (バージニア北部) に作っていたものを、 東京(ap-northeast-1)に変えて実行してみました。
GitHub - PigumerGroup/aws-ecs-fargate
Task
polly で mp3 を生成してそれを S3 に保存する Task を Fargate で実行できるようにしてみました。
GitHub - PigumerGroup/sbt-aws-cloudformation-example
両方とも、ちゃんと実行できましたぁ。うれしい。
これからは、どんどん ECS を使っていきたい気持ちです。
スレッド数を CloudWatch のカスタムメトリクスとして Publish する
micrometer を使ってスレッド数を CloudWatch のカスタムメトリクスとして Publish してみました
// https://mvnrepository.com/artifact/io.micrometer/micrometer-registry-cloudwatch libraryDependencies += "io.micrometer" % "micrometer-registry-cloudwatch" % "1.0.5"
package com.pigumer.example; import java.util.Properties; public class MicrometerProperties { public Properties DEFAULT; public MicrometerProperties() { DEFAULT = new Properties(); DEFAULT.setProperty("cloudwatch.enabled", "true"); DEFAULT.setProperty("cloudwatch.namespace", "MICROMETER"); DEFAULT.setProperty("cloudwatch.step", "PT1M"); DEFAULT.setProperty("cloudwatch.batchSize", "1"); } }
package com.pigumer.example; import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder; import io.micrometer.cloudwatch.CloudWatchConfig; import io.micrometer.cloudwatch.CloudWatchMeterRegistry; import io.micrometer.core.instrument.Clock; import io.micrometer.core.instrument.binder.jvm.JvmThreadMetrics; import java.time.Duration; import java.util.Properties; import java.util.stream.IntStream; public class Main { private static AmazonCloudWatchAsync cloudWatch = AmazonCloudWatchAsyncClientBuilder.standard().withRegion("ap-northeast-1").build(); private static Clock clock = Clock.SYSTEM; private static CloudWatchConfig config = new CloudWatchConfig() { private Properties properties = new MicrometerProperties().DEFAULT; @Override public String get(String key) { return properties.getProperty(key); } }; private static Runnable sleep = () -> { try { Thread.sleep(60000); } catch (Exception e) { e.printStackTrace(); } }; public static void main(String[] args) throws Exception { CloudWatchMeterRegistry registry = new CloudWatchMeterRegistry(config, clock, cloudWatch); new JvmThreadMetrics().bindTo(registry); IntStream.range(1, 20).forEach(n -> new Thread(sleep).run()); Thread.sleep(60000 * 3); } }
スレッド数を CloudWatch のカスタムメトリクスとして Publish する
package jp.pigumer.monitor; import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsync; import com.amazonaws.services.cloudwatch.AmazonCloudWatchAsyncClientBuilder; import com.amazonaws.services.cloudwatch.model.Dimension; import com.amazonaws.services.cloudwatch.model.MetricDatum; import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest; import com.amazonaws.services.cloudwatch.model.StandardUnit; import java.lang.management.ManagementFactory; import java.util.Date; public class CloudWatchReporter implements Runnable { private AmazonCloudWatchAsync cloundwatch = AmazonCloudWatchAsyncClientBuilder.standard().build(); private String id; public CloudWatchReporter(String id) { this.id = id; } @Override public void run() { Double count = (double) ManagementFactory.getThreadMXBean().getThreadCount(); MetricDatum metricData = new MetricDatum() .withDimensions(new Dimension().withName("Id").withValue(id)) .withMetricName("thread_count") .withUnit(StandardUnit.Count) .withValue(count) .withTimestamp(new Date()); PutMetricDataRequest request = new PutMetricDataRequest() .withNamespace("CUSTOM/TEST") .withMetricData(metricData); cloundwatch.putMetricDataAsync(request); } }
この CloudWatchReporter を 1 分間隔で実行して CloudWatch に Publish します。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); ScheduledFuture handler = scheduler.scheduleAtFixedRate( new CloudWatchReporter("test"), 0, 1, TimeUnit.MINUTES);
TOTP - Time-Based One-Time Password
TOTP について調べるため、AWS の MFA で使うことのできる 仮想 MFA アプリケーションと同様の機能を持つアプリケーションを作成してみました。
TOTP の仕様
- RFC 4226: HOTP: An HMAC-Based One-Time Password Algorithm
- RFC 6238: TOTP: Time-Based One-Time Password Algorithm
使い方
ユーザのホームディレクトリに .onetime というディレクトリを作成し、そのディレクトリに onetime.properties というファイルを作成します。
$ mkdir ~/.onetime $ cat > ~/.onetime/onetime.properties
onetime.properties には、AWS の MFA を有効にしたときに表示されるシークレットキーを設定します。
secret=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
DynamoDB - query
クエリの操作 - Amazon DynamoDB には、 "A single Query will only return a result set that fits within the 1 MB size limit." 「1 つの Query は、1 MB のサイズ制限の範囲内の結果セットだけを返します。」と書かれています。
query を実行したとき返される Result の getItems と、withSelect("COUNT") を使用した場合のように getItems のない件数取得のみの場合とではどんな違いがあるのだろうと思い、DynamoDB ローカルと東京リージョンの DynamoDB とで 同じコードを実行して試してみました。
その結果、withSelect("COUNT") の場合でも、結果セットを取得する場合でも同じ件数で分割されて返されることがわかりました。
試したコードを下に記載します。なお、DynamoDB ローカルと AWS 上の DynamoDB では分割される件数は差異がありました。
package jp.pigumer.dynamodb import java.util.UUID import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item} import com.amazonaws.services.dynamodbv2.model._ import scala.util.Try case class Data(hashKey: String, rangeKey: String, rangeKey2: String) { val item = new Item() .withPrimaryKey("HashKey", hashKey, "RangeKey", rangeKey) .withString("RangeKey2", rangeKey2) } object TestTable { val hashKeyValue = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" def deleteTable(dynamoDB: DynamoDB) = { dynamoDB.getTable("TEST1").delete() } def createTable(dynamoDB: DynamoDB) = { val request = new CreateTableRequest() .withTableName("TEST1") .withAttributeDefinitions( new AttributeDefinition() .withAttributeName("HashKey") .withAttributeType("S"), new AttributeDefinition() .withAttributeName("RangeKey") .withAttributeType("S"), new AttributeDefinition() .withAttributeName("RangeKey2") .withAttributeType("S") ) .withKeySchema( new KeySchemaElement() .withAttributeName("HashKey") .withKeyType("HASH"), new KeySchemaElement() .withAttributeName("RangeKey") .withKeyType("RANGE") ) .withProvisionedThroughput( new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L) ) .withGlobalSecondaryIndexes( new GlobalSecondaryIndex() .withIndexName("GlobalIndex") .withKeySchema( new KeySchemaElement() .withAttributeName("HashKey") .withKeyType("HASH"), new KeySchemaElement() .withAttributeName("RangeKey2") .withKeyType("RANGE") ) .withProjection( new Projection().withProjectionType("ALL") ) .withProvisionedThroughput( new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L) ) ) val result: TableDescription = dynamoDB.createTable(request).waitForActive() println(result) } def saveTestData(dynamoDB: DynamoDB) = Try { val table = dynamoDB.getTable("TEST1") for (_ <- 1 to 1000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").item) } for (_ <- 1 to 40000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC").item) } for (_ <- 1 to 10000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB").item) } }.get }
package jp.pigumer.dynamodb import java.util.logging.Logger import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest, QueryResult} import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder} import org.specs2.mutable.Specification import org.specs2.specification.BeforeAfterAll import scala.collection.JavaConverters._ class DynamoDBSpec extends Specification with BeforeAfterAll { val logger = Logger.getLogger(this.getClass.getName) lazy val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder .standard() .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", "ap-northeast-1")) .build() lazy val dynamoDB = new DynamoDB(client) private def query(request: QueryRequest): Iterator[QueryResult] = new Iterator[QueryResult] { var result: Option[QueryResult] = None override def hasNext: Boolean = result.fold(true)( queryResult ⇒ null != queryResult.getLastEvaluatedKey ) override def next(): QueryResult = result match { case Some(previous) ⇒ result = Some(client.query(request.withExclusiveStartKey(previous.getLastEvaluatedKey()))) result.get case None ⇒ result = Some(client.query(request)) result.get } } "Test" should { "Count" in { logger.info("start") val request = new QueryRequest() .withTableName("TEST1") .withIndexName("GlobalIndex") .withSelect("COUNT") .withKeyConditionExpression("#H = :H and #R = :R") .withExpressionAttributeNames(Map("#H" → "HashKey", "#R" → "RangeKey2").asJava) .withExpressionAttributeValues(Map(":H" → new AttributeValue().withS(TestTable.hashKeyValue), ":R" → new AttributeValue().withS("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")).asJava) val list = query(request).toList logger.info(s"list size: ${list.length}") val result = list(0) logger.info(s"completed: ${result.getCount}") result.getCount must_== 8457 val result2 = list(1) logger.info(s"completed2: ${result2.getCount}") result2.getCount must_== 1543 } "Page" in { logger.info("start") val queryRequest = new QueryRequest() .withTableName("TEST1") .withIndexName("GlobalIndex") .withKeyConditionExpression("#H = :H and #R = :R") .withExpressionAttributeNames(Map("#H" → "HashKey", "#R" → "RangeKey2").asJava) .withExpressionAttributeValues( Map(":H" → new AttributeValue().withS(TestTable.hashKeyValue), ":R" → new AttributeValue().withS("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")).asJava) val list = query(queryRequest).toList val s = list(0).getItems().size logger.info(s"size: $s") s must_== 8457 } } override def beforeAll: Unit = { TestTable.createTable(dynamoDB) TestTable.saveTestData(dynamoDB) } override def afterAll: Unit = { TestTable.deleteTable(dynamoDB) } }
Akka Streams - Broadcast と Thread
Broadcast
下のような Broadcast を使ったコードを記述します
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。
[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 t
ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。
マルチスレッドで Broadcast された Flow を実行する
テストコードを以下のようにします。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
1 秒 Wait する Flow に async を付加しています。
val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async
ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10 t
default-dispatcher のスレッド数を 1 にする
application.conf
を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。
akka { actor { default-dispatcher { fork-join-executor { parallelism-min = 1 parallelism-factor = 1.0 parallelism-max = 1 } } throughput = 1 } }
スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。
[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
Deadlock に注意
async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。
Pipelining and Parallelism • Akka Documentation
Akka in Action AKKA IN ACTION [ Raymond Roestenburg ]
- ジャンル: 本・雑誌・コミック > 洋書 > COMPUTERS & SCIENCE
- ショップ: 楽天ブックス
- 価格: 12,096円
Akka Streams - javax.sound を使って mp3 を再生する
javax.sound を使って、mp3 を再生する
package jp.pigumer.cast import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} import akka.stream.scaladsl.Flow import akka.util.ByteString import javax.sound.sampled.AudioFormat.Encoding import javax.sound.sampled._ object Player { val mixerInfo: Array[Mixer.Info] = AudioSystem.getMixerInfo() val convert = (bytes: ByteString) ⇒ { def write(f: OutputStream ⇒ Unit) = { val output = new ByteArrayOutputStream() try { f(output) ByteString(output.toByteArray) } finally { output.close() } } val originalAudioInput = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray)) val originalFormat = originalAudioInput.getFormat() val converted: AudioInputStream = AudioSystem.getAudioInputStream( new AudioFormat( Encoding.PCM_SIGNED, originalFormat.getSampleRate, 16, originalFormat.getChannels, originalFormat.getChannels * 2, originalFormat.getSampleRate, false ), originalAudioInput) write(output ⇒ AudioSystem.write(converted, AudioFileFormat.Type.AU, output)) } val play = (index: Int) ⇒ Flow[ByteString].map { bytes ⇒ val input = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray)) val mixer = mixerInfo(index) val clip = AudioSystem.getClip(mixer) clip.open(input) clip.start() while (!clip.isRunning) { Thread.sleep(500) } while (clip.isRunning) { Thread.sleep(500) } clip.close() input } }
Play
val done = FileIO.fromPath(Paths.get("src/main/resources/hello.mp3")) .map(Player.convert) .via(Player.play(0)).withAttributes( ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher")) .runWith(Sink.ignore)
GitHub - takesection/arm32v6-scala-cast
Akka in Action AKKA IN ACTION [ Raymond Roestenburg ]
- ジャンル: 本・雑誌・コミック > 洋書 > COMPUTERS & SCIENCE
- ショップ: 楽天ブックス
- 価格: 12,096円