TOTP - Time-Based One-Time Password
TOTP について調べるため、AWS の MFA で使うことのできる 仮想 MFA アプリケーションと同様の機能を持つアプリケーションを作成してみました。
TOTP の仕様
- RFC 4226: HOTP: An HMAC-Based One-Time Password Algorithm
- RFC 6238: TOTP: Time-Based One-Time Password Algorithm
使い方
ユーザのホームディレクトリに .onetime というディレクトリを作成し、そのディレクトリに onetime.properties というファイルを作成します。
$ mkdir ~/.onetime $ cat > ~/.onetime/onetime.properties
onetime.properties には、AWS の MFA を有効にしたときに表示されるシークレットキーを設定します。
secret=XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
DynamoDB - query
クエリの操作 - Amazon DynamoDB には、 "A single Query will only return a result set that fits within the 1 MB size limit." 「1 つの Query は、1 MB のサイズ制限の範囲内の結果セットだけを返します。」と書かれています。
query を実行したとき返される Result の getItems と、withSelect("COUNT") を使用した場合のように getItems のない件数取得のみの場合とではどんな違いがあるのだろうと思い、DynamoDB ローカルと東京リージョンの DynamoDB とで 同じコードを実行して試してみました。
その結果、withSelect("COUNT") の場合でも、結果セットを取得する場合でも同じ件数で分割されて返されることがわかりました。
試したコードを下に記載します。なお、DynamoDB ローカルと AWS 上の DynamoDB では分割される件数は差異がありました。
package jp.pigumer.dynamodb import java.util.UUID import com.amazonaws.services.dynamodbv2.document.{DynamoDB, Item} import com.amazonaws.services.dynamodbv2.model._ import scala.util.Try case class Data(hashKey: String, rangeKey: String, rangeKey2: String) { val item = new Item() .withPrimaryKey("HashKey", hashKey, "RangeKey", rangeKey) .withString("RangeKey2", rangeKey2) } object TestTable { val hashKeyValue = "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" def deleteTable(dynamoDB: DynamoDB) = { dynamoDB.getTable("TEST1").delete() } def createTable(dynamoDB: DynamoDB) = { val request = new CreateTableRequest() .withTableName("TEST1") .withAttributeDefinitions( new AttributeDefinition() .withAttributeName("HashKey") .withAttributeType("S"), new AttributeDefinition() .withAttributeName("RangeKey") .withAttributeType("S"), new AttributeDefinition() .withAttributeName("RangeKey2") .withAttributeType("S") ) .withKeySchema( new KeySchemaElement() .withAttributeName("HashKey") .withKeyType("HASH"), new KeySchemaElement() .withAttributeName("RangeKey") .withKeyType("RANGE") ) .withProvisionedThroughput( new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L) ) .withGlobalSecondaryIndexes( new GlobalSecondaryIndex() .withIndexName("GlobalIndex") .withKeySchema( new KeySchemaElement() .withAttributeName("HashKey") .withKeyType("HASH"), new KeySchemaElement() .withAttributeName("RangeKey2") .withKeyType("RANGE") ) .withProjection( new Projection().withProjectionType("ALL") ) .withProvisionedThroughput( new ProvisionedThroughput() .withReadCapacityUnits(1L) .withWriteCapacityUnits(1L) ) ) val result: TableDescription = dynamoDB.createTable(request).waitForActive() println(result) } def saveTestData(dynamoDB: DynamoDB) = Try { val table = dynamoDB.getTable("TEST1") for (_ <- 1 to 1000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA").item) } for (_ <- 1 to 40000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC").item) } for (_ <- 1 to 10000) { table.putItem(Data(hashKeyValue, UUID.randomUUID().toString, "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB").item) } }.get }
package jp.pigumer.dynamodb import java.util.logging.Logger import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.dynamodbv2.document.DynamoDB import com.amazonaws.services.dynamodbv2.model.{AttributeValue, QueryRequest, QueryResult} import com.amazonaws.services.dynamodbv2.{AmazonDynamoDB, AmazonDynamoDBClientBuilder} import org.specs2.mutable.Specification import org.specs2.specification.BeforeAfterAll import scala.collection.JavaConverters._ class DynamoDBSpec extends Specification with BeforeAfterAll { val logger = Logger.getLogger(this.getClass.getName) lazy val client: AmazonDynamoDB = AmazonDynamoDBClientBuilder .standard() .withEndpointConfiguration( new AwsClientBuilder.EndpointConfiguration("http://localhost:8000", "ap-northeast-1")) .build() lazy val dynamoDB = new DynamoDB(client) private def query(request: QueryRequest): Iterator[QueryResult] = new Iterator[QueryResult] { var result: Option[QueryResult] = None override def hasNext: Boolean = result.fold(true)( queryResult ⇒ null != queryResult.getLastEvaluatedKey ) override def next(): QueryResult = result match { case Some(previous) ⇒ result = Some(client.query(request.withExclusiveStartKey(previous.getLastEvaluatedKey()))) result.get case None ⇒ result = Some(client.query(request)) result.get } } "Test" should { "Count" in { logger.info("start") val request = new QueryRequest() .withTableName("TEST1") .withIndexName("GlobalIndex") .withSelect("COUNT") .withKeyConditionExpression("#H = :H and #R = :R") .withExpressionAttributeNames(Map("#H" → "HashKey", "#R" → "RangeKey2").asJava) .withExpressionAttributeValues(Map(":H" → new AttributeValue().withS(TestTable.hashKeyValue), ":R" → new AttributeValue().withS("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")).asJava) val list = query(request).toList logger.info(s"list size: ${list.length}") val result = list(0) logger.info(s"completed: ${result.getCount}") result.getCount must_== 8457 val result2 = list(1) logger.info(s"completed2: ${result2.getCount}") result2.getCount must_== 1543 } "Page" in { logger.info("start") val queryRequest = new QueryRequest() .withTableName("TEST1") .withIndexName("GlobalIndex") .withKeyConditionExpression("#H = :H and #R = :R") .withExpressionAttributeNames(Map("#H" → "HashKey", "#R" → "RangeKey2").asJava) .withExpressionAttributeValues( Map(":H" → new AttributeValue().withS(TestTable.hashKeyValue), ":R" → new AttributeValue().withS("CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC")).asJava) val list = query(queryRequest).toList val s = list(0).getItems().size logger.info(s"size: $s") s must_== 8457 } } override def beforeAll: Unit = { TestTable.createTable(dynamoDB) TestTable.saveTestData(dynamoDB) } override def afterAll: Unit = { TestTable.deleteTable(dynamoDB) } }
Akka Streams - Broadcast と Thread
Broadcast
下のような Broadcast を使ったコードを記述します
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
このコードは、Source で与えられた Int 値を 5 つの Flow に Broadcast し、それぞれ 1 秒 Wait した後、スレッド名を返す ようになっています。 このテストコードを実行すると、以下のようなログが出力されます。
[INFO] [05/31/2018 15:27:21.900] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:22.905] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:23.909] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:24.915] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:27:25.919] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 t
ログのフォーマット上、スレッド名が出力されているので、ログメッセージのスレッド名が冗長ですが、Broadcast された Flow は同じ Thread を使って、順番に実行されていることがわかります。
マルチスレッドで Broadcast された Flow を実行する
テストコードを以下のようにします。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{ActorMaterializer, Attributes, FlowShape} import akka.testkit.TestKitBase import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.Await import scala.concurrent.duration._ class DeadlockSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Deadlock") "Test" should { "test1" in new WithFixture { val done = Source.single(1).via( broadcasts ).runWith(Sink.ignore) Await.ready(done, 10 seconds) done.value.get.get success } } trait WithFixture extends Scope { implicit val materializer = ActorMaterializer() val broadcasts = Flow.fromGraph(GraphDSL.create() { implicit b ⇒ import GraphDSL.Implicits._ val bc = b.add(Broadcast[Int](5)) val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async val zip = b.add(ZipWith[String, String, String, String, String, (String, String, String, String, String)]((_, _, _, _, _))) bc.out(0) ~> w("wait 0") ~> zip.in0 bc.out(1) ~> w("wait 1") ~> zip.in1 bc.out(2) ~> w("wait 2") ~> zip.in2 bc.out(3) ~> w("wait 3") ~> zip.in3 bc.out(4) ~> w("wait 4") ~> zip.in4 FlowShape(bc.in, zip.out) }) } }
1 秒 Wait する Flow に async を付加しています。
val w = (name: String) ⇒ Flow[Int].map { n ⇒ Thread.sleep(1000) s"$n - ${Thread.currentThread.getName}" } .log(name).withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) .async
ログの出力結果は下のようになります。それぞれに別のスレッドが割り当てられていることがわかります。
[INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-5] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-5 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-4] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-4 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-9] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-9 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 15:36:51.821] [Deadlock-akka.actor.default-dispatcher-10] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-10 t
default-dispatcher のスレッド数を 1 にする
application.conf
を記述して、default-dispatcher でのスレッド数を 1 に設定してみます。
akka { actor { default-dispatcher { fork-join-executor { parallelism-min = 1 parallelism-factor = 1.0 parallelism-max = 1 } } throughput = 1 } }
スレッドが 1 つしか割り当てられないため、最初の例のように順番に実行されます。
[INFO] [05/31/2018 16:43:32.499] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 0] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:33.503] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 1] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:34.508] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 2] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:35.512] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 3] Element: 1 - Deadlock-akka.actor.default-dispatcher-2 [INFO] [05/31/2018 16:43:36.518] [Deadlock-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://Deadlock/system/StreamSupervisor-0)] [wait 4] Element: 1 - Deadlock-akka.actor.default-dispatcher-2
Deadlock に注意
async を使えばスレッドプールからスレッドを割り当てて実行されるため、Broadcast された Flow をマルチスレッドで動かしたくなると思います。 ですが、適切な値のスレッドプールを用意しておかないと Deadlock が発生する場合があります。
Pipelining and Parallelism • Akka Documentation
Akka in Action AKKA IN ACTION [ Raymond Roestenburg ]
- ジャンル: 本・雑誌・コミック > 洋書 > COMPUTERS & SCIENCE
- ショップ: 楽天ブックス
- 価格: 12,096円
Akka Streams - javax.sound を使って mp3 を再生する
javax.sound を使って、mp3 を再生する
package jp.pigumer.cast import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} import akka.stream.scaladsl.Flow import akka.util.ByteString import javax.sound.sampled.AudioFormat.Encoding import javax.sound.sampled._ object Player { val mixerInfo: Array[Mixer.Info] = AudioSystem.getMixerInfo() val convert = (bytes: ByteString) ⇒ { def write(f: OutputStream ⇒ Unit) = { val output = new ByteArrayOutputStream() try { f(output) ByteString(output.toByteArray) } finally { output.close() } } val originalAudioInput = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray)) val originalFormat = originalAudioInput.getFormat() val converted: AudioInputStream = AudioSystem.getAudioInputStream( new AudioFormat( Encoding.PCM_SIGNED, originalFormat.getSampleRate, 16, originalFormat.getChannels, originalFormat.getChannels * 2, originalFormat.getSampleRate, false ), originalAudioInput) write(output ⇒ AudioSystem.write(converted, AudioFileFormat.Type.AU, output)) } val play = (index: Int) ⇒ Flow[ByteString].map { bytes ⇒ val input = AudioSystem.getAudioInputStream(new ByteArrayInputStream(bytes.toArray)) val mixer = mixerInfo(index) val clip = AudioSystem.getClip(mixer) clip.open(input) clip.start() while (!clip.isRunning) { Thread.sleep(500) } while (clip.isRunning) { Thread.sleep(500) } clip.close() input } }
Play
val done = FileIO.fromPath(Paths.get("src/main/resources/hello.mp3")) .map(Player.convert) .via(Player.play(0)).withAttributes( ActorAttributes.dispatcher("akka.stream.blocking-io-dispatcher")) .runWith(Sink.ignore)
GitHub - takesection/arm32v6-scala-cast
Akka in Action AKKA IN ACTION [ Raymond Roestenburg ]
- ジャンル: 本・雑誌・コミック > 洋書 > COMPUTERS & SCIENCE
- ショップ: 楽天ブックス
- 価格: 12,096円
Akka Streams - Source
Source(1 to 10)
のように Source と宣言されているものはもちろん Source ですが、
Source ではじまっていて、Sink で終わっていない場合は、Source として扱うことができます。
val source: Source[String, NotUsed] = Source(1 to 10) .via(Flow[Int].fold(0)((acc, n) ⇒ acc + n) .map(_.toString))
package jp.pigumer.akka import akka.NotUsed import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Flow, Source} import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestKitBase import org.specs2.mutable.Specification class SourceSpec extends Specification with TestKitBase { override implicit lazy val system = ActorSystem("Test") implicit val materializer = ActorMaterializer() "Source" should { "sample1" in { val source: Source[String, NotUsed] = Source(1 to 10).via(Flow[Int].fold(0)((acc, n) ⇒ acc + n).map(_.toString)) val result = source.runWith(TestSink.probe[String]) result.request(1).expectNext must_== "55" } } }
flatMapConcat のような Source を返さないといけない場合に、Source として扱えることを知っておくと役立ちます
Future のテスト
Future は、処理がブロックされず、いずれ答えを返してくれる便利なものですが、 テストコードのように結果を検証したい場合は処理の完了を待つ必要があります。
下のサンプルコードでは、Future を Await.ready を使って、実行が完了するのを待ちます。 Await.ready に指定した時間が経過しても処理が完了しない場合は例外がスローされてテストケースは 失敗します。 実行が完了した場合は、Future.value.get で処理結果を取得します。
package jp.pigumer.akka import akka.actor.{ActorSystem, Props} import akka.event.Logging import akka.util.Timeout import org.specs2.mutable.Specification import org.specs2.specification.Scope import scala.concurrent.duration._ import akka.pattern._ import scala.concurrent.Await class FutureSpec extends Specification { trait WithFixture extends Scope { implicit val system = ActorSystem("FutureSpec") implicit val logger = Logging(system, classOf[WithFixture]) val actor = system.actorOf(Props[SlowActor]) } "Future Test" should { "Success" in new WithFixture { implicit val timeout: Timeout = 1 seconds logger.info("start") val e = actor ? 1 Await.ready(e, 60 minutes) logger.info("finish") e.value.get.fold( cause ⇒ { logger.error(cause, "failed") failure }, _ ⇒ success ) } "Timeout" in new WithFixture { implicit val timeout: Timeout = 1 seconds logger.info("start") val e = actor ? 4 Await.ready(e, 5 seconds) logger.info("finish") e.value.get.fold( cause ⇒ { logger.error(cause, "failed") success }, _ ⇒ failure ) } } }
Raspberry Pi の画面を回転
PDF や電子ブックのようなドキュメントを読みたいときは特にですが、画面を回転させて縦にしたくなります。
Raspberry Pi (Raspbian) で 90 度回転させたい場合は、/boot/config.txt にdisplay_hdmi_rotate=1
のように記述してリブートすれば反映されます。
詳しいことは、下のリンクに記述されています。