Akka Streams - Source に Actor, Queue を使うコード

Actor を使う

DeadLetter を Subscribe する Actor を Akka Stream で実装してみます

コードは下のようになります

  val actorRef: ActorRef = 
    Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()

to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。

返された actorRef は下のように DeadLetter を Subscribe するために使用できます。

val system: ActorSystem

system.eventStream.subscribe(actorRef, classOf[DeadLetter])

Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。

backpressure を使いたい場合は、Queue を使用します。

Queue を使う

Queue を使うコードは下のようになります

  val queue: SourceQueueWithComplete[DeadLetter] =
    Source.queue[DeadLetter](100, OverflowStrategy.backpressure)
      .to(Sink.foreach {
        case DeadLetter(message, _, recipient) ⇒
          logger.info(s"$message - $recipient")
      }).run()