Akka Streams - Source

Source(1 to 10) のように Source と宣言されているものはもちろん Source ですが、 Source ではじまっていて、Sink で終わっていない場合は、Source として扱うことができます。

val source: Source[String, NotUsed] =
  Source(1 to 10)
    .via(Flow[Int].fold(0)((acc, n) ⇒ acc + n)
    .map(_.toString))
package jp.pigumer.akka

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKitBase
import org.specs2.mutable.Specification

class SourceSpec extends Specification with TestKitBase {

  override implicit lazy val system = ActorSystem("Test")
  implicit val materializer = ActorMaterializer()

  "Source" should {
    "sample1" in {
      val source: Source[String, NotUsed] =
        Source(1 to 10).via(Flow[Int].fold(0)((acc, n) ⇒ acc + n).map(_.toString))
      val result = source.runWith(TestSink.probe[String])
      result.request(1).expectNext must_== "55"
    }
  }
}

flatMapConcat のような Source を返さないといけない場合に、Source として扱えることを知っておくと役立ちます