Akka Streams - 時間のかかる Flow をタイムアウトにより失敗させる

あるフローの処理時間がかかりすぎた場合に異常と判断してタイムアウトさせるために、 completionTimeout を使ったサンプルコードです。

package jp.pigumer.akka

import akka.actor.{Actor, ActorSystem, Props}
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}
import akka.util.Timeout

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContextExecutor, Future}
import scala.util.{Failure, Success}

class SlowActor extends Actor {
  override def receive = {
    case i: Int ⇒
      implicit val executionContext = context.system.dispatcher
      val originalSender = sender
      Future {
        Thread.sleep(i * 500)
        originalSender ! i
      }
  }
}

object CompletionTimeoutSpec extends App {

  implicit val system: ActorSystem = ActorSystem("CompletionTimeoutSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val ref = system.actorOf(Props[SlowActor])
  implicit val timeout: Timeout = 60 seconds

  val source = Source(Stream.from(1))
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].ask[Int](ref)
    .completionTimeout(3 seconds)
    .log("flow", elem ⇒ s"$elem")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val done =
    source
      .via(flow).toMat(Sink.ignore)(Keep.right)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }
}

Akka Streams - KillSwitch で外から FlowShape を制御する

KillSwitch をフローで使用して、viaMat(killSwitch)(Keep.right) で、後で使用できるようにします。 サンプルコードでは、フローの完了を取得するため、toMat(sink)(Keep.both) を使って、KillSwitch と Future[Done] の両方を受取り、 2 秒後に KillSwitch.shutdown() を実行して RunnableGraph を正常終了させます。

エラーで終了させたい場合は、KillSwitch.abort(Throwable) を使うことができます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, DelayOverflowStrategy, KillSwitches}

import scala.concurrent.ExecutionContextExecutor
import scala.concurrent.duration._
import scala.util.{Failure, Success}

object KillSwitchSpec extends App {

  implicit val system: ActorSystem = ActorSystem("KillSwitchSpec")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, this.getClass)

  val source = Source(Stream.from(1))
    .delay(1 seconds, DelayOverflowStrategy.backpressure)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val killSwitch = KillSwitches.single[Int]
  val flow = Flow[Int].map { i ⇒
    i
  }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val (k, done) =
    source
      .viaMat(killSwitch)(Keep.right)
      .via(flow).toMat(Sink.ignore)(Keep.both)
      .run()

  done.onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "error")
      system.terminate()
  }

  Thread.sleep(2000)
  k.shutdown()
}

Akka Streams - async, groupBy

async

まず async を使わないコードからはじめてみます。

この場合は、単純に 1 から 10 まで順番に処理が実行されます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

次に async をつけて実行してみます。

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).async
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

フローにかかる実行時間をずらすために Sleep を追加していますが、この場合でも順番に処理されます。

groupBy と async

groupBy と async を使った SubFlow で試してみます

package jp.pigumer.akka

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}

import scala.concurrent.ExecutionContextExecutor
import scala.util.{Failure, Success}

object HelloWorld extends App {

  implicit val system: ActorSystem = ActorSystem("HelloWorld")

  implicit val materializer: ActorMaterializer = ActorMaterializer()
  implicit val executionContext: ExecutionContextExecutor = system.dispatcher

  val logger: LoggingAdapter = Logging(system, "HelloWorld")

  val source = Source(1 to 10).groupBy(3, _ % 3)
    .log("source")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))
  val flow = Flow[Int].map { i ⇒
    Thread.sleep((10 - i) * 200)
    i }.async
    .log("flow", elem ⇒ s"[$elem]")
    .withAttributes(Attributes.logLevels(onElement = Logging.InfoLevel))

  val runnable =
    source.via(flow).mergeSubstreams.toMat(Sink.ignore)(Keep.right)

  runnable.run().onComplete {
    case Success(_) ⇒
      logger.info("success")
      system.terminate()
    case Failure(cause) ⇒
      logger.error(cause, "run")
      system.terminate()
  }
}

groupBy でフローを分けて async を使うことで、順序は保証されなくなりますが、効率のよい処理を実現することができます。

Akka Streams - Source に Actor, Queue を使うコード

Actor を使う

DeadLetter を Subscribe する Actor を Akka Stream で実装してみます

コードは下のようになります

  val actorRef: ActorRef = 
    Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。

返された actorRef は下のように DeadLetter を Subscribe するために使用できます。

val system: ActorSystem

system.eventStream.subscribe(actorRef, classOf[DeadLetter])

Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。

backpressure を使いたい場合は、Queue を使用します。

Queue を使う

Queue を使うコードは下のようになります

  val queue: SourceQueueWithComplete[DeadLetter] =
    Source.queue[DeadLetter](100, OverflowStrategy.backpressure)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

パターンマッチ

Spray Json などを使って JSON を読み込んだあとに、ちょこっとキー名だけ変えたりしたい場合があるのですが、こうしたときパターンマッチ使うととても簡単にかけそうだなと試してみました

val transformed = jsobject.filelds.map {
    case ("id", JsString(id)) ⇒
        "another_id" → JsNumber(id.toInt)
    case ("name", name) ⇒
        "product_name" → name
}

JsObject(transformed)

mobileprovision を Scala(Java) でロードする

iOS アプリケーションの ipa ファイルの mobileprovision を署名のないイメージにするためのコードです

package com.pigumer.ipa.test

import java.io.FileInputStream
import java.nio.charset.StandardCharsets
import java.util.zip.ZipInputStream

import org.bouncycastle.cms.CMSSignedData

object LoadIpaApplication extends App {

  val fileName = "test.ipa"
  val fis = new FileInputStream(fileName)
  val zip = new ZipInputStream(fis)
  Stream.continually(Option(zip.getNextEntry)).takeWhile(_.isDefined).foreach { opt ⇒
    opt.filter(entry ⇒ entry.getName.endsWith(".mobileprovision")).foreach { entry ⇒
      val bytes = Stream.continually(zip.read()).takeWhile(_ != -1).map(_.toByte).toArray
      val xml = new CMSSignedData(bytes).getSignedContent.getContent.asInstanceOf[Array[Byte]]
      println(new String(xml, StandardCharsets.UTF_8))
    }
  }
}

Bouncy Castle のライブラリを使うことで簡単に署名のないイメージで取得することができます。 同じ方法で署名付きの Profile 等も読むことができます。

GitHub

Network設定(1)

CLI でネットワーク設定を操作

  • Rasberry Pi

bridge を作ってみる

$ sudo -s
# ip link add name br0 type bridge
# ifconfig
br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        ether 82:db:31:03:84:ed  txqueuelen 1000  (イーサネット)
        RX packets 0  bytes 0 (0.0 B)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 2  bytes 762 (762.0 B)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

bridge と network interface をくっつける

# brctl addif br0 eth0
# ifconfig
br0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST>  mtu 1500
        inet 169.254.98.203  netmask 255.255.0.0  broadcast 169.254.255.255
        ether xx:xx:xx:xx:xx:xx  txqueuelen 1000  (イーサネット)
        RX packets 22  bytes 1840 (1.7 KiB)
        RX errors 0  dropped 0  overruns 0  frame 0
        TX packets 21  bytes 4170 (4.0 KiB)
        TX errors 0  dropped 0 overruns 0  carrier 0  collisions 0

dhcpcd で br0 にアドレスを振り直す

# dhcpcd -k
# dhcpcd -n -H br0
# ip addr show

4: br0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc noqueue state UP group default qlen 1000
    link/ether xx:xx:xx:xx:xx:xx brd ff:ff:ff:ff:ff:ff
    inet 192.168.1.161/24 brd 192.168.1.255 scope global br0
       valid_lft forever preferred_lft forever