Akka Streams - async, groupBy

async

まず async を使わないコードからはじめてみます。

この場合は、単純に 1 から 10 まで順番に処理が実行されます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

次に async をつけて実行してみます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).async
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。

groupBy と async

groupBy と async を使った SubFlow で試してみます

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).groupBy(3, _ % 3)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。

Akka Streams - Source に Actor, Queue を使うコード

Actor を使う

DeadLetter を Subscribe する Actor を Akka Stream で実装してみます

コードは下のようになります

  val actorRef: ActorRef = 
    Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。

返された actorRef は下のように DeadLetter を Subscribe するために使用できます。

val system: ActorSystem

system.eventStream.subscribe(actorRef, classOf[DeadLetter])

Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。

backpressure を使いたい場合は、Queue を使用します。

Queue を使う

Queue を使うコードは下のようになります

  val queue: SourceQueueWithComplete[DeadLetter] =
    Source.queue[DeadLetter](100, OverflowStrategy.backpressure)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

パターンマッチ

Spray Json などを使って JSON を読み込んだあとに、ちょこっとキー名だけ変えたりしたい場合があるのですが、こうしたときパターンマッチ使うととても簡単にかけそうだなと試してみました

val transformed = jsobject.filelds.map {
    case ("id", JsString(id)) ⇒
        "another_id" → JsNumber(id.toInt)
    case ("name", name) ⇒
        "product_name" → name
}

JsObject(transformed)

mobileprovision を Scala(Java) でロードする

iOS アプリケーションの ipa ファイルの mobileprovision を署名のないイメージにするためのコードです

package com.pigumer.ipa.test

import java.io.FileInputStream
import java.nio.charset.StandardCharsets
import java.util.zip.ZipInputStream

import org.bouncycastle.cms.CMSSignedData

object LoadIpaApplication extends App {

  val fileName = "test.ipa"
  val fis = new FileInputStream(fileName)
  val zip = new ZipInputStream(fis)
  Stream.continually(Option(zip.getNextEntry)).takeWhile(_.isDefined).foreach { opt ⇒
    opt.filter(entry ⇒ entry.getName.endsWith(".mobileprovision")).foreach { entry ⇒
      val bytes = Stream.continually(zip.read()).takeWhile(_ != -1).map(_.toByte).toArray
      val xml = new CMSSignedData(bytes).getSignedContent.getContent.asInstanceOf[Array[Byte]]
      println(new String(xml, StandardCharsets.UTF_8))
    }
  }
}

Bouncy Castle のライブラリを使うことで簡単に署名のないイメージで取得することができます。 同じ方法で署名付きの Profile 等も読むことができます。

GitHub

Network設定(1)

CLI でネットワーク設定を操作

  • Rasberry Pi

bridge を作ってみる

$ sudo -s
# ip link add name br0 type bridge
# ifconfig
br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        ether 82:db:31:03:84:ed  txqueuelen 1000  (イーサネット)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 2  bytes 762 (762.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

bridge と network interface をくっつける

# brctl addif br0 eth0
# ifconfig
br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 169.254.98.203  netmask 255.255.0.0  broadcast 169.254.255.255
        ether xx:xx:xx:xx:xx:xx  txqueuelen 1000  (イーサネット)
        RX packets 22  bytes 1840 (1.7 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 21  bytes 4170 (4.0 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

dhcpcd で br0 にアドレスを振り直す

# dhcpcd -k
# dhcpcd -n -H br0
# ip addr show

4: br0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether xx:xx:xx:xx:xx:xx brd ff:ff:ff:ff:ff:ff
    inet 192.168.1.161/24 brd 192.168.1.255 scope global br0
       valid_lft forever preferred_lft forever

Google Home mini

# apt-get update
# apt-get install -y nodejs npm libavahi-compat-libdnssd-dev
# npm cache clean
# npm install npm n -g
# n stable
$ npm init
$ npm install google-home-notifier
const googlehome = require('google-home-notifier');
const language = 'ja';

googlehome.device('Google-Home', language);

googlehome.notify('てすと', function(res) {
  console.log(res);
});

github.com

Scala を使う例

Docker の restart policy

docker run 時に --restart で restart policy を設定できます(デフォルトは no)。

Flag Description
no 再起動しない
on-failure エラーで停止したときは再起動
unless-stopped 明示的に停止していなければ再起動
always 常に再起動

docs.docker.com