Akka Streams - async, groupBy
async
まず async を使わないコードからはじめてみます。
この場合は、単純に 1 から 10 まで順番に処理が実行されます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i } .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
次に async をつけて実行してみます。
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).async .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。
groupBy と async
groupBy と async を使った SubFlow で試してみます
package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl.{Flow, Keep, Sink, Source} import akka.stream.{ActorMaterializer, Attributes} import scala.concurrent.ExecutionContextExecutor import scala.util.{Failure, Success} object HelloWorld extends App { implicit val system: ActorSystem = ActorSystem("HelloWorld") implicit val materializer: ActorMaterializer = ActorMaterializer() implicit val executionContext: ExecutionContextExecutor = system.dispatcher val logger: LoggingAdapter = Logging(system, "HelloWorld") val source = Source(1 to 10).groupBy(3, _ % 3) .log("source") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val flow = Flow[Int].map { i ⇒ Thread.sleep((10 - i) * 200) i }.async .log("flow", elem ⇒ s"[$elem]") .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel)) val runnable = source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right) runnable.run().onComplete { case Success(_) ⇒ logger.info("success") system.terminate() case Failure(cause) ⇒ logger.error(cause, "run") system.terminate() } }
groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。
Akka Streams - Source に Actor, Queue を使うコード
Actor を使う
DeadLetter を Subscribe する Actor を Akka Stream で実装してみます
コードは下のようになります
val actorRef: ActorRef = Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()
to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。
返された actorRef は下のように DeadLetter を Subscribe するために使用できます。
val system: ActorSystem
system.eventStream.subscribe(actorRef, classOf[DeadLetter])
Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。
backpressure を使いたい場合は、Queue を使用します。
Queue を使う
Queue を使うコードは下のようになります
val queue: SourceQueueWithComplete[DeadLetter] = Source.queue[DeadLetter](100, OverflowStrategy.backpressure) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()
mobileprovision を Scala(Java) でロードする
iOS アプリケーションの ipa ファイルの mobileprovision を署名のないイメージにするためのコードです
package com.pigumer.ipa.test import java.io.FileInputStream import java.nio.charset.StandardCharsets import java.util.zip.ZipInputStream import org.bouncycastle.cms.CMSSignedData object LoadIpaApplication extends App { val fileName = "test.ipa" val fis = new FileInputStream(fileName) val zip = new ZipInputStream(fis) Stream.continually(Option(zip.getNextEntry)).takeWhile(_.isDefined).foreach { opt ⇒ opt.filter(entry ⇒ entry.getName.endsWith(".mobileprovision")).foreach { entry ⇒ val bytes = Stream.continually(zip.read()).takeWhile(_ != -1).map(_.toByte).toArray val xml = new CMSSignedData(bytes).getSignedContent.getContent.asInstanceOf[Array[Byte]] println(new String(xml, StandardCharsets.UTF_8)) } } }
Bouncy Castle のライブラリを使うことで簡単に署名のないイメージで取得することができます。 同じ方法で署名付きの Profile 等も読むことができます。
Network設定(1)
CLI でネットワーク設定を操作
- Rasberry Pi
bridge を作ってみる
$ sudo -s # ip link add name br0 type bridge # ifconfig br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 ether 82:db:31:03:84:ed txqueuelen 1000 (イーサネット) RX packets 0 bytes 0 (0.0 B) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 2 bytes 762 (762.0 B) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
bridge と network interface をくっつける
# brctl addif br0 eth0 # ifconfig br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500 inet 169.254.98.203 netmask 255.255.0.0 broadcast 169.254.255.255 ether xx:xx:xx:xx:xx:xx txqueuelen 1000 (イーサネット) RX packets 22 bytes 1840 (1.7 KiB) RX errors 0 dropped 0 overruns 0 frame 0 TX packets 21 bytes 4170 (4.0 KiB) TX errors 0 dropped 0 overruns 0 carrier 0 collisions 0
dhcpcd で br0 にアドレスを振り直す
# dhcpcd -k # dhcpcd -n -H br0 # ip addr show 4: br0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000 link/ether xx:xx:xx:xx:xx:xx brd ff:ff:ff:ff:ff:ff inet 192.168.1.161/24 brd 192.168.1.255 scope global br0 valid_lft forever preferred_lft forever
Google Home mini
# apt-get update # apt-get install -y nodejs npm libavahi-compat-libdnssd-dev # npm cache clean # npm install npm n -g # n stable
$ npm init $ npm install google-home-notifier
const googlehome = require('google-home-notifier'); const language = 'ja'; googlehome.device('Google-Home', language); googlehome.notify('てすと', function(res) { console.log(res); });
Scala を使う例
Docker の restart policy
docker run 時に --restart で restart policy を設定できます(デフォルトは no)。
Flag | Description |
---|---|
no | 再起動しない |
on-failure | エラーで停止したときは再起動 |
unless-stopped | 明示的に停止していなければ再起動 |
always | 常に再起動 |