akka streams

Akka Streams - Broadcast と Thread

Broadcast 下のような Broadcast を使ったコードを記述します package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.Logging import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Sink, Source, ZipWith} import akka.stream.{Ac…

Akka Streams - javax.sound を使って mp3 を再生する

javax.sound を使って、mp3 を再生する package jp.pigumer.cast import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} import akka.stream.scaladsl.Flow import akka.util.ByteString import javax.sound.sampled.AudioFormat.Enc…

Akka Streams - Source

Source(1 to 10) のように Source と宣言されているものはもちろん Source ですが、 Source ではじまっていて、Sink で終わっていない場合は、Source として扱うことができます。 val source: Source[String, NotUsed] = Source(1 to 10) .via(Flow[Int].fol…

Akka Streams - Iterator を Source として使用する

サンプルコード package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Keep, Sink, Source} import scala.concurrent.ExecutionContext…

Akka Streams - Paginator

一覧表示でよくある Paginator を実装してみます。 このサンプルコードでは、Paginator の対象となるデータは、Stream.from(1) で生成するため、1 からはじまる無限個のデータとなります。 そのため、すべてのページのデータを作成してから、必要なページの…

Akka Streams - エラー時のリスタート

Actor の場合の backoff supervision pattern for actors (Supervision and Monitoring • Akka Documentation) のように Akka Streams には、RestartSource, RestartSink, RestartFlow があります。 このサンプルでは、RestartSource を使って、リトライを 2…

Akka Streams - 時間のかかる Flow をタイムアウトにより失敗させる

あるフローの処理時間がかかりすぎた場合に異常と判断してタイムアウトさせるために、 completionTimeout を使ったサンプルコードです。 package jp.pigumer.akka import akka.actor.{Actor, ActorSystem, Props} import akka.event.{Logging, LoggingAdapte…

Akka Streams - KillSwitch で外から FlowShape を制御する

KillSwitch をフローで使用して、viaMat(killSwitch)(Keep.right) で、後で使用できるようにします。 サンプルコードでは、フローの完了を取得するため、toMat(sink)(Keep.both) を使って、KillSwitch と Future[Done] の両方を受取り、 2 秒後に KillSwitch…

Akka Streams - async, groupBy

async まず async を使わないコードからはじめてみます。 この場合は、単純に 1 から 10 まで順番に処理が実行されます。 package jp.pigumer.akka import akka.actor.ActorSystem import akka.event.{Logging, LoggingAdapter} import akka.stream.scaladsl…

Akka Streams - Source に Actor, Queue を使うコード

Actor を使う DeadLetter を Subscribe する Actor を Akka Stream で実装してみます コードは下のようになります val actorRef: ActorRef = Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail) .to(Sink.foreach { case DeadLetter(message, _, …