Akka Streams - Source に Actor, Queue を使うコード
Actor を使う
DeadLetter を Subscribe する Actor を Akka Stream で実装してみます
コードは下のようになります
val actorRef: ActorRef = Source.actorRef[DeadLetter](100, OverflowStrategy.dropTail) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()
to(sink) は、toMat(sink)(Keep.left) と同じなので、Source の Mat が返されます。
返された actorRef は下のように DeadLetter を Subscribe するために使用できます。
val system: ActorSystem
system.eventStream.subscribe(actorRef, classOf[DeadLetter])
Actor を使った Source の場合には、OverflowStrategy として backpressure を使うことができません。
backpressure を使いたい場合は、Queue を使用します。
Queue を使う
Queue を使うコードは下のようになります
val queue: SourceQueueWithComplete[DeadLetter] = Source.queue[DeadLetter](100, OverflowStrategy.backpressure) .to(Sink.foreach { case DeadLetter(message, _, recipient) ⇒ logger.info(s"$message - $recipient") }).run()